blob: 4fb42b37cfd24e0c72d508c6e4a6bab709cb9ab4 [file] [log] [blame]
/**
* BaseOPC class definition
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <string>
#include "opc.h"
#include "opcbase.h"
#include "FlowFileRecord.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "core/Core.h"
#include "core/Property.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Property BaseOPCProcessor::OPCServerEndPoint(
core::PropertyBuilder::createProperty("OPC server endpoint")
->withDescription("Specifies the address, port and relative path of an OPC endpoint")
->isRequired(true)->build());
core::Property BaseOPCProcessor::ApplicationURI(
core::PropertyBuilder::createProperty("Application URI")
->withDescription("Application URI of the client in the format 'urn:unconfigured:application'. "
"Mandatory, if using Secure Channel and must match the URI included in the certificate's Subject Alternative Names.")->build());
core::Property BaseOPCProcessor::Username(
core::PropertyBuilder::createProperty("Username")
->withDescription("Username to log in with.")->build());
core::Property BaseOPCProcessor::Password(
core::PropertyBuilder::createProperty("Password")
->withDescription("Password to log in with. Providing this requires cert and key to be provided as well, credentials are always sent encrypted.")->build());
core::Property BaseOPCProcessor::CertificatePath(
core::PropertyBuilder::createProperty("Certificate path")
->withDescription("Path to the DER-encoded cert file")->build());
core::Property BaseOPCProcessor::KeyPath(
core::PropertyBuilder::createProperty("Key path")
->withDescription("Path to the DER-encoded key file")->build());
core::Property BaseOPCProcessor::TrustedPath(
core::PropertyBuilder::createProperty("Trusted server certificate path")
->withDescription("Path to the DER-encoded trusted server certificate")->build());
void BaseOPCProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
logger_->log_trace("BaseOPCProcessor::onSchedule");
applicationURI_.clear();
certBuffer_.clear();
keyBuffer_.clear();
password_.clear();
username_.clear();
trustBuffers_.clear();
configOK_ = false;
context->getProperty(OPCServerEndPoint.getName(), endPointURL_);
context->getProperty(ApplicationURI.getName(), applicationURI_);
if (context->getProperty(Username.getName(), username_) != context->getProperty(Password.getName(), password_)) {
logger_->log_error("Both or neither of Username and Password should be provided!");
return;
}
auto certificatePathRes = context->getProperty(CertificatePath.getName(), certpath_);
auto keyPathRes = context->getProperty(KeyPath.getName(), keypath_);
auto trustedPathRes = context->getProperty(TrustedPath.getName(), trustpath_);
if (certificatePathRes != keyPathRes || keyPathRes != trustedPathRes) {
logger_->log_error("All or none of Certificate path, Key path and Trusted server certificate path should be provided!");
return;
}
if (!password_.empty() && (certpath_.empty() || keypath_.empty() || trustpath_.empty() || applicationURI_.empty())) {
logger_->log_error("Certificate path, Key path, Trusted server certificate path and Application URI must be provided in case Password is provided!");
return;
}
if (!certpath_.empty()) {
if (applicationURI_.empty()) {
logger_->log_error("Application URI must be provided if Certificate path is provided!");
return;
}
std::ifstream input_cert(certpath_, std::ios::binary);
if (input_cert.good()) {
certBuffer_ = std::vector<char>(std::istreambuf_iterator<char>(input_cert), {});
}
std::ifstream input_key(keypath_, std::ios::binary);
if (input_key.good()) {
keyBuffer_ = std::vector<char>(std::istreambuf_iterator<char>(input_key), {});
}
trustBuffers_.emplace_back();
std::ifstream input_trust(trustpath_, std::ios::binary);
if (input_trust.good()) {
trustBuffers_[0] = std::vector<char>(std::istreambuf_iterator<char>(input_trust), {});
}
if (certBuffer_.empty()) {
logger_->log_error("Failed to load cert from path: %s", certpath_);
return;
}
if (keyBuffer_.empty()) {
logger_->log_error("Failed to load key from path: %s", keypath_);
return;
}
if (trustBuffers_[0].empty()) {
logger_->log_error("Failed to load trusted server certs from path: %s", trustpath_);
return;
}
}
configOK_ = true;
}
bool BaseOPCProcessor::reconnect() {
if (connection_ == nullptr) {
connection_ = opc::Client::createClient(logger_, applicationURI_, certBuffer_, keyBuffer_, trustBuffers_);
}
if (connection_->isConnected()) {
return true;
}
auto sc = connection_->connect(endPointURL_, username_, password_);
if (sc != UA_STATUSCODE_GOOD) {
logger_->log_error("Failed to connect: %s!", UA_StatusCode_name(sc));
return false;
}
logger_->log_debug("Successfully connected.");
return true;
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */