blob: 7d7c7101c535b49da123e4e2c409f98cef03b0db [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <string>
#include "opc.h"
#include "opcbase.h"
#include "FlowFileRecord.h"
#include "core/ProcessSession.h"
#include "core/Core.h"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::processors {
void BaseOPCProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
logger_->log_trace("BaseOPCProcessor::onSchedule");
application_uri_.clear();
cert_buffer_.clear();
key_buffer_.clear();
password_.clear();
username_.clear();
trust_buffers_.clear();
endpoint_url_ = context.getProperty(OPCServerEndPoint).value_or("");
application_uri_ = context.getProperty(ApplicationURI).value_or("");
username_ = context.getProperty(Username).value_or("");
password_ = context.getProperty(Password).value_or("");
if (username_.empty() != password_.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Both or neither of Username and Password should be provided!");
}
certpath_ = context.getProperty(CertificatePath).value_or("");
keypath_ = context.getProperty(KeyPath).value_or("");
trustpath_ = context.getProperty(TrustedPath).value_or("");
if (certpath_.empty() != keypath_.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "All or none of Certificate path and Key path should be provided!");
}
if (certpath_.empty()) {
return;
}
if (application_uri_.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Application URI must be provided if Certificate path is provided!");
}
std::ifstream input_cert(certpath_, std::ios::binary);
if (input_cert.good()) {
cert_buffer_ = std::vector<char>(std::istreambuf_iterator<char>(input_cert), {});
}
std::ifstream input_key(keypath_, std::ios::binary);
if (input_key.good()) {
key_buffer_ = std::vector<char>(std::istreambuf_iterator<char>(input_key), {});
}
if (cert_buffer_.empty()) {
auto error_msg = utils::string::join_pack("Failed to load cert from path: ", certpath_);
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
if (key_buffer_.empty()) {
auto error_msg = utils::string::join_pack("Failed to load key from path: ", keypath_);
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
auto trusted_cert_paths = utils::string::splitAndTrimRemovingEmpty(trustpath_, ",");
for (const auto& trust_path : trusted_cert_paths) {
std::ifstream input_trust(trust_path, std::ios::binary);
if (input_trust.good()) {
trust_buffers_.push_back(std::vector<char>(std::istreambuf_iterator<char>(input_trust), {}));
} else {
auto error_msg = utils::string::join_pack("Failed to load trusted server certs from path: ", trust_path);
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
}
bool BaseOPCProcessor::reconnect() {
if (connection_ == nullptr) {
connection_ = opc::Client::createClient(logger_, application_uri_, cert_buffer_, key_buffer_, trust_buffers_);
}
if (connection_->isConnected()) {
return true;
}
auto sc = connection_->connect(endpoint_url_, username_, password_);
if (sc != UA_STATUSCODE_GOOD) {
logger_->log_error("Failed to connect: {}!", UA_StatusCode_name(sc));
return false;
}
logger_->log_debug("Successfully connected.");
return true;
}
void BaseOPCProcessor::readPathReferenceTypes(core::ProcessContext& context, const std::string& node_id) {
const auto value = context.getProperty(PathReferenceTypes).value_or("");
if (value.empty()) {
return;
}
auto path_reference_types = utils::string::splitAndTrimRemovingEmpty(value, "/");
if (path_reference_types.size() + 1 != utils::string::splitAndTrimRemovingEmpty(node_id, "/").size()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Path reference types must be provided for each node pair in the path!");
}
for (const auto& reference_type : path_reference_types) {
if (auto ua_ref_type = opc::mapOpcReferenceType(reference_type)) {
path_reference_types_.push_back(*ua_ref_type);
} else {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Unsupported reference type set in 'Path reference types' property: '{}'.", reference_type));
}
}
}
void BaseOPCProcessor::parseIdType(core::ProcessContext& context, const core::PropertyReference& prop) {
id_type_ = utils::parseEnumProperty<opc::OPCNodeIDType>(context, prop);
if (id_type_ == opc::OPCNodeIDType::Int) {
try {
static_cast<void>(std::stoi(node_id_));
} catch(const std::exception&) {
auto error_msg = utils::string::join_pack(node_id_, " cannot be used as an int type node ID");
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
}
} // namespace org::apache::nifi::minifi::processors