blob: c5acf3f69d508a855a87dcb823e0108d1189bdbb [file] [log] [blame]
/**
* PutOPC class definition
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <string>
#include <list>
#include <map>
#include <mutex>
#include <thread>
#include "opc.h"
#include "putopc.h"
#include "utils/ByteArrayCallback.h"
#include "FlowFileRecord.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "core/Core.h"
#include "core/Property.h"
#include "core/Resource.h"
#include "controllers/SSLContextService.h"
#include "core/logging/LoggerConfiguration.h"
#include "utils/Id.h"
#include "utils/StringUtils.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Property PutOPCProcessor::ParentNodeID(
core::PropertyBuilder::createProperty("Parent node ID")
->withDescription("Specifies the ID of the root node to traverse")
->isRequired(true)->build());
core::Property PutOPCProcessor::ParentNodeIDType(
core::PropertyBuilder::createProperty("Parent node ID type")
->withDescription("Specifies the type of the provided node ID")
->isRequired(true)
->withAllowableValues<std::string>({"Path", "Int", "String"})->build());
core::Property PutOPCProcessor::ParentNameSpaceIndex(
core::PropertyBuilder::createProperty("Parent node namespace index")
->withDescription("The index of the namespace. Used only if node ID type is not path.")
->withDefaultValue<int32_t>(0)->build());
core::Property PutOPCProcessor::ValueType(
core::PropertyBuilder::createProperty("Value type")
->withDescription("Set the OPC value type of the created nodes")
->isRequired(true)->build());
core::Property PutOPCProcessor::TargetNodeIDType(
core::PropertyBuilder::createProperty("Target node ID type")
->withDescription("ID type of target node. Allowed values are: Int, String.")
->supportsExpressionLanguage(true)->build());
core::Property PutOPCProcessor::TargetNodeID(
core::PropertyBuilder::createProperty("Target node ID")
->withDescription("ID of target node.")
->supportsExpressionLanguage(true)->build());
core::Property PutOPCProcessor::TargetNodeNameSpaceIndex(
core::PropertyBuilder::createProperty("Target node namespace index")
->withDescription("The index of the namespace. Used only if node ID type is not path.")
->supportsExpressionLanguage(true)->build());
core::Property PutOPCProcessor::TargetNodeBrowseName(
core::PropertyBuilder::createProperty("Target node browse name")
->withDescription("Browse name of target node. Only used when new node is created.")
->supportsExpressionLanguage(true)->build());
static core::Property TargetNodeID;
static core::Property TargetNodeBrowseName;
core::Relationship PutOPCProcessor::Success("success", "Successfully put OPC-UA node");
core::Relationship PutOPCProcessor::Failure("failure", "Failed to put OPC-UA node");
void PutOPCProcessor::initialize() {
PutOPCProcessor::ValueType.clearAllowedValues();
core::PropertyValue pv;
for(const auto& kv : opc::StringToOPCDataTypeMap) {
pv = kv.first;
PutOPCProcessor::ValueType.addAllowedValue(pv);
}
std::set<core::Property> putOPCProperties = {ParentNodeID, ParentNodeIDType, ParentNameSpaceIndex, ValueType, TargetNodeIDType, TargetNodeID, TargetNodeNameSpaceIndex, TargetNodeBrowseName};
std::set<core::Property> baseOPCProperties = BaseOPCProcessor::getSupportedProperties();
putOPCProperties.insert(baseOPCProperties.begin(), baseOPCProperties.end());
setSupportedProperties(putOPCProperties);
// Set the supported relationships
setSupportedRelationships({Success, Failure});
}
void PutOPCProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
logger_->log_trace("PutOPCProcessor::onSchedule");
parentExists_ = false;
BaseOPCProcessor::onSchedule(context, factory);
if(!configOK_) {
return;
}
configOK_ = false;
context->getProperty(OPCServerEndPoint.getName(), endPointURL_);
std::string value;
context->getProperty(ParentNodeID.getName(), nodeID_);
context->getProperty(ParentNodeIDType.getName(), value);
if (value == "String") {
idType_ = opc::OPCNodeIDType::String;
} else if (value == "Int") {
idType_ = opc::OPCNodeIDType::Int;
} else if (value == "Path") {
idType_ = opc::OPCNodeIDType::Path;
} else {
// Where have our validators gone?
logger_->log_error("%s is not a valid node ID type!", value.c_str());
}
if(idType_ == opc::OPCNodeIDType::Int) {
try {
int t = std::stoi(nodeID_);
} catch(...) {
logger_->log_error("%s cannot be used as an int type node ID", nodeID_.c_str());
return;
}
}
if(idType_ != opc::OPCNodeIDType::Path) {
if(!context->getProperty(ParentNameSpaceIndex.getName(), nameSpaceIdx_)) {
logger_->log_error("%s is mandatory in case %s is not Path", ParentNameSpaceIndex.getName().c_str(), ParentNodeIDType.getName().c_str());
return;
}
}
std::string typestr;
context->getProperty(ValueType.getName(), typestr);
nodeDataType_ = opc::StringToOPCDataTypeMap.at(typestr); // This throws, but allowed values are generated based on this map -> that's a really unexpected error
configOK_ = true;
}
void PutOPCProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
if (!configOK_) {
logger_->log_error(
"This processor was not configured properly, yielding. Please check for previous errors in the logs!");
yield();
return;
}
logger_->log_trace("PutOPCProcessor::onTrigger");
std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
if (!lock.owns_lock()) {
logger_->log_warn("processor was triggered before previous listing finished, configuration should be revised!");
return;
}
if (!reconnect()) {
yield();
return;
}
if (!parentExists_) {
if (idType_ == opc::OPCNodeIDType::Path) {
std::vector<UA_NodeId> translatedNodeIDs;
if (connection_->translateBrowsePathsToNodeIdsRequest(nodeID_, translatedNodeIDs, logger_) !=
UA_STATUSCODE_GOOD) {
logger_->log_error("Failed to translate %s to node id, no flow files will be put", nodeID_.c_str());
yield();
return;
} else if (translatedNodeIDs.size() != 1) {
logger_->log_error("%s was translated to multiple node ids, no flow files will be put", nodeID_.c_str());
yield();
return;
} else {
parentNodeID_ = translatedNodeIDs[0];
parentExists_ = true;
}
} else {
parentNodeID_.namespaceIndex = nameSpaceIdx_;
if (idType_ == opc::OPCNodeIDType::Int) {
parentNodeID_.identifierType = UA_NODEIDTYPE_NUMERIC;
parentNodeID_.identifier.numeric = std::stoi(nodeID_);
} else if (idType_ == opc::OPCNodeIDType::String) {
parentNodeID_.identifierType = UA_NODEIDTYPE_STRING;
parentNodeID_.identifier.string = UA_STRING_ALLOC(nodeID_.c_str());
}
if (!connection_->exists(parentNodeID_)) {
logger_->log_error("Parent node doesn't exist, no flow files will be put");
yield();
return;
}
parentExists_ = true;
}
}
std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
// Do nothing if there are no incoming files
if (!flowFile) {
return;
}
std::string targetidtype;
bool targetNodeExists = false;
bool targetNodeValid = false;
UA_NodeId targetnode;
if (context->getProperty(TargetNodeIDType, targetidtype, flowFile)) {
std::string targetid;
std::string namespaceidx;
if (!context->getProperty(TargetNodeID, targetid, flowFile)) {
logger_->log_error("Flowfile %s had target node ID type specified (%s) without ID, routing to failure!",
flowFile->getUUIDStr(), targetidtype);
session->transfer(flowFile, Failure);
return;
}
if (!context->getProperty(TargetNodeNameSpaceIndex, namespaceidx, flowFile)) {
logger_->log_error(
"Flowfile %s had target node ID type specified (%s) without namespace index, routing to failure!",
flowFile->getUUIDStr(), targetidtype);
session->transfer(flowFile, Failure);
return;
}
int32_t nsi;
try {
nsi = std::stoi(namespaceidx);
} catch (...) {
logger_->log_error("Flowfile %s has invalid namespace index (%s), routing to failure!",
flowFile->getUUIDStr(), namespaceidx);
session->transfer(flowFile, Failure);
return;
}
targetnode.namespaceIndex = nsi;
if (targetidtype == "Int") {
targetnode.identifierType = UA_NODEIDTYPE_NUMERIC;
try {
targetnode.identifier.numeric = std::stoi(targetid);
targetNodeValid = true;
} catch (...) {
logger_->log_error("Flowfile %s: target node ID is not a valid integer: %s. Routing to failure!",
flowFile->getUUIDStr(), targetid);
session->transfer(flowFile, Failure);
return;
}
} else if (targetidtype == "String") {
targetnode.identifierType = UA_NODEIDTYPE_STRING;
targetnode.identifier.string = UA_STRING_ALLOC(targetid.c_str());
targetNodeValid = true;
} else {
logger_->log_error("Flowfile %s: target node ID type is invalid: %s. Routing to failure!",
flowFile->getUUIDStr(), targetidtype);
session->transfer(flowFile, Failure);
return;
}
targetNodeExists = connection_->exists(targetnode);
}
ReadCallback cb(logger_);
session->read(flowFile, &cb);
const auto &vec = cb.getContent();
std::string contentstr(reinterpret_cast<const char *>(vec.data()), vec.size());
if (targetNodeExists) {
logger_->log_trace("Node exists, trying to update it");
try {
UA_StatusCode sc;
switch (nodeDataType_) {
case opc::OPCNodeDataType::Int64: {
int64_t value = std::stoll(contentstr);
sc = connection_->update_node(targetnode, value);
break;
}
case opc::OPCNodeDataType::UInt64: {
uint64_t value = std::stoull(contentstr);
sc = connection_->update_node(targetnode, value);
break;
}
case opc::OPCNodeDataType::Int32: {
int32_t value = std::stoi(contentstr);
sc = connection_->update_node(targetnode, value);
break;
}
case opc::OPCNodeDataType::UInt32: {
uint32_t value = std::stoul(contentstr);
sc = connection_->update_node(targetnode, value);
break;
}
case opc::OPCNodeDataType::Boolean: {
bool value;
if (utils::StringUtils::StringToBool(contentstr, value)) {
sc = connection_->update_node(targetnode, value);
} else {
throw opc::OPCException(GENERAL_EXCEPTION, "Content cannot be converted to bool");
}
break;
}
case opc::OPCNodeDataType::Float: {
float value = std::stof(contentstr);
sc = connection_->update_node(targetnode, value);
break;
}
case opc::OPCNodeDataType::Double: {
double value = std::stod(contentstr);
sc = connection_->update_node(targetnode, value);
break;
}
case opc::OPCNodeDataType::String: {
sc = connection_->update_node(targetnode, contentstr);
break;
}
default:
throw opc::OPCException(GENERAL_EXCEPTION, "This should never happen!");
}
if (sc != UA_STATUSCODE_GOOD) {
logger_->log_error("Failed to update node: %s", UA_StatusCode_name(sc));
session->transfer(flowFile, Failure);
return;
}
} catch (...) {
std::string typestr;
context->getProperty(ValueType.getName(), typestr);
logger_->log_error("Failed to convert %s to data type %s", contentstr, typestr);
session->transfer(flowFile, Failure);
return;
}
logger_->log_trace("Node successfully updated!");
session->transfer(flowFile, Success);
return;
} else {
logger_->log_trace("Node doesn't exist, trying to create new node");
std::string browsename;
if (!context->getProperty(TargetNodeBrowseName, browsename, flowFile)) {
logger_->log_error("Target node browse name is required for flowfile (%s) as new node is to be created",
flowFile->getUUIDStr());
session->transfer(flowFile, Failure);
return;
}
if (!targetNodeValid) {
targetnode = UA_NODEID_NUMERIC(1, 0);
}
try {
UA_StatusCode sc;
UA_NodeId resultnode;
switch (nodeDataType_) {
case opc::OPCNodeDataType::Int64: {
int64_t value = std::stoll(contentstr);
sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, nodeDataType_, &resultnode);
break;
}
case opc::OPCNodeDataType::UInt64: {
uint64_t value = std::stoull(contentstr);
sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, nodeDataType_, &resultnode);
break;
}
case opc::OPCNodeDataType::Int32: {
int32_t value = std::stoi(contentstr);
sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, nodeDataType_, &resultnode);
break;
}
case opc::OPCNodeDataType::UInt32: {
uint32_t value = std::stoul(contentstr);
sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, nodeDataType_, &resultnode);
break;
}
case opc::OPCNodeDataType::Boolean: {
bool value;
if (utils::StringUtils::StringToBool(contentstr, value)) {
sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, nodeDataType_, &resultnode);
} else {
throw opc::OPCException(GENERAL_EXCEPTION, "Content cannot be converted to bool");
}
break;
}
case opc::OPCNodeDataType::Float: {
float value = std::stof(contentstr);
sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, nodeDataType_, &resultnode);
break;
}
case opc::OPCNodeDataType::Double: {
double value = std::stod(contentstr);
sc = connection_->add_node(parentNodeID_, targetnode, browsename, value, nodeDataType_, &resultnode);
break;
}
case opc::OPCNodeDataType::String: {
sc = connection_->add_node(parentNodeID_, targetnode, browsename, contentstr, nodeDataType_, &resultnode);
break;
}
default:
throw opc::OPCException(GENERAL_EXCEPTION, "This should never happen!");
}
if (sc != UA_STATUSCODE_GOOD) {
logger_->log_error("Failed to create node: %s", UA_StatusCode_name(sc));
session->transfer(flowFile, Failure);
return;
}
} catch (...) {
std::string typestr;
context->getProperty(ValueType.getName(), typestr);
logger_->log_error("Failed to convert %s to data type %s", contentstr, typestr);
session->transfer(flowFile, Failure);
return;
}
logger_->log_trace("Node successfully created!");
session->transfer(flowFile, Success);
return;
}
}
int64_t PutOPCProcessor::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
buf_.clear();
buf_.resize(stream->getSize());
uint64_t size = 0;
do {
int read = stream->read(buf_.data() + size, 1024);
if (read < 0) {
return -1;
}
if (read == 0) {
break;
}
size += read;
} while (size < stream->getSize());
logger_->log_trace("Read %llu bytes from flowfile content to buffer", stream->getSize());
return size;
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */