| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #ifndef LIBMINIFI_INCLUDE_CONTROLLERS_MQTTCONTEXTSERVICE_H_ |
| #define LIBMINIFI_INCLUDE_CONTROLLERS_MQTTCONTEXTSERVICE_H_ |
| |
| #include <openssl/err.h> |
| #include <openssl/ssl.h> |
| #include <iostream> |
| #include <memory> |
| #include "core/Resource.h" |
| #include "utils/StringUtils.h" |
| #include "io/validation.h" |
| #include "core/controller/ControllerService.h" |
| #include "core/logging/LoggerConfiguration.h" |
| #include "controllers/SSLContextService.h" |
| #include "concurrentqueue.h" |
| #include "MQTTClient.h" |
| |
| namespace org { |
| namespace apache { |
| namespace nifi { |
| namespace minifi { |
| namespace controllers { |
| |
| static constexpr const char* const MQTT_QOS_0 = "0"; |
| static constexpr const char* const MQTT_QOS_1 = "1"; |
| static constexpr const char* const MQTT_QOS_2 = "2"; |
| |
| class Message { |
| public: |
| // empty constructor facilitates moves |
| Message() { |
| } |
| explicit Message(const std::string &topic, void *data, size_t dataLen) |
| : topic_(topic), |
| data_((uint8_t*) data, ((uint8_t*)data + dataLen)) { |
| } |
| explicit Message(const Message &&other) |
| : topic_(std::move(other.topic_)), |
| data_(std::move(other.data_)) { |
| } |
| ~Message() { |
| } |
| |
| Message &operator=(const Message &&other) { |
| topic_ = std::move(other.topic_); |
| data_ = std::move(other.data_); |
| return *this; |
| } |
| std::string topic_; |
| std::vector<uint8_t> data_; |
| }; |
| |
| /** |
| * MQTTContextService provides a controller service for MQTT connectivity. |
| * |
| */ |
| class MQTTControllerService : public core::controller::ControllerService { |
| public: |
| explicit MQTTControllerService(const std::string &name, const std::string &id) |
| : ControllerService(name, id), |
| initialized_(false), |
| client_(nullptr), |
| keepAliveInterval_(0), |
| connectionTimeOut_(0), |
| qos_(2), |
| ssl_context_service_(nullptr), |
| logger_(logging::LoggerFactory<MQTTControllerService>::getLogger()) { |
| } |
| |
| explicit MQTTControllerService(const std::string &name, utils::Identifier uuid = utils::Identifier()) |
| : ControllerService(name, uuid), |
| initialized_(false), |
| client_(nullptr), |
| keepAliveInterval_(0), |
| connectionTimeOut_(0), |
| qos_(2), |
| ssl_context_service_(nullptr), |
| logger_(logging::LoggerFactory<MQTTControllerService>::getLogger()) { |
| } |
| |
| explicit MQTTControllerService(const std::string &name, const std::shared_ptr<Configure> &configuration) |
| : ControllerService(name), |
| initialized_(false), |
| client_(nullptr), |
| keepAliveInterval_(0), |
| connectionTimeOut_(0), |
| qos_(2), |
| ssl_context_service_(nullptr), |
| logger_(logging::LoggerFactory<MQTTControllerService>::getLogger()) { |
| setConfiguration(configuration); |
| initialize(); |
| } |
| |
| static core::Property BrokerURL; |
| static core::Property ClientID; |
| static core::Property UserName; |
| static core::Property Password; |
| static core::Property CleanSession; |
| static core::Property KeepLiveInterval; |
| static core::Property ConnectionTimeOut; |
| static core::Property Topic; |
| static core::Property QOS; |
| static core::Property SecurityProtocol; |
| |
| virtual void initialize(); |
| |
| void yield() { |
| |
| } |
| |
| int send(const std::string &topic, const std::vector<uint8_t> &data) { |
| int token; |
| MQTTClient_message pubmsg = MQTTClient_message_initializer; |
| const uint8_t *d = data.data(); |
| pubmsg.payload = const_cast<uint8_t*>(d); |
| pubmsg.payloadlen = data.size(); |
| pubmsg.qos = qos_; |
| pubmsg.retained = 0; |
| |
| auto resp = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, &token); |
| if (resp != MQTTCLIENT_SUCCESS) { |
| return -1; |
| } |
| if (qos_ == 0) { |
| std::unique_lock<std::mutex> lock(delivery_mutex_); |
| delivered_[token] = true; |
| } |
| return token; |
| } |
| |
| int send(const std::string &topic, const uint8_t *data, size_t dataSize) { |
| int token; |
| |
| MQTTClient_message pubmsg = MQTTClient_message_initializer; |
| pubmsg.payload = const_cast<uint8_t*>(data); |
| pubmsg.payloadlen = dataSize; |
| pubmsg.qos = qos_; |
| pubmsg.retained = 0; |
| |
| auto resp = MQTTClient_publishMessage(client_, topic.c_str(), &pubmsg, &token); |
| if (resp != MQTTCLIENT_SUCCESS) { |
| return -1; |
| } |
| |
| if (qos_ == 0) { |
| std::unique_lock<std::mutex> lock(delivery_mutex_); |
| delivered_[token] = true; |
| } |
| return token; |
| } |
| |
| bool isRunning() { |
| return getState() == core::controller::ControllerServiceState::ENABLED; |
| } |
| |
| bool isWorkAvailable() { |
| return false; |
| } |
| |
| virtual void onEnable(); |
| |
| void subscribeToTopic(const std::string newTopic) { |
| std::lock_guard<std::mutex> lock(initialization_mutex_); |
| if (topics_.find(newTopic) == topics_.end()) { |
| MQTTClient_subscribe(client_, newTopic.c_str(), qos_); |
| topics_[newTopic].size_approx(); |
| } |
| } |
| |
| bool waitForDelivery(const uint64_t millisToWait, int token) { |
| std::unique_lock<std::mutex> lock(delivery_mutex_); |
| if (delivery_notification_.wait_for(lock, std::chrono::milliseconds(millisToWait), [&] {return delivered_[token] == true;})) { |
| bool delivered = delivered_[token]; |
| delivered_.erase(token); |
| return delivered; |
| } else { |
| delivered_.erase(token); |
| return false; |
| } |
| } |
| |
| bool get(const uint64_t millisToWait, const std::string &topic, std::vector<uint8_t> &data) { |
| std::unique_lock<std::mutex> lock(delivery_mutex_); |
| if (delivery_notification_.wait_for(lock, std::chrono::milliseconds(millisToWait), [&] {return topics_[topic].size_approx() > 0;})) { |
| Message resp; |
| if (topics_[topic].try_dequeue(resp)) { |
| data = std::move(resp.data_); |
| return true; |
| } else { |
| return false; |
| } |
| } else { |
| return false; |
| } |
| } |
| |
| bool awaitResponse(const uint64_t millisToWait, int token, const std::string &topic, std::vector<uint8_t> &data) { |
| std::unique_lock<std::mutex> lock(delivery_mutex_); |
| if (delivery_notification_.wait_for(lock, std::chrono::milliseconds(millisToWait), [&] { |
| return |
| delivered_[token] == true; |
| })) { |
| bool delivered = delivered_[token]; |
| if (delivered) { |
| if (delivery_notification_.wait_for(lock, std::chrono::milliseconds(millisToWait), [&] {return topics_[topic].size_approx() > 0;})) { |
| Message resp; |
| if (topics_[topic].try_dequeue(resp)) { |
| data = std::move(resp.data_); |
| return true; |
| } else { |
| return false; |
| } |
| } else { |
| return false; |
| } |
| } |
| delivered_.erase(token); |
| return delivered; |
| } else { |
| delivered_.erase(token); |
| return false; |
| } |
| } |
| |
| protected: |
| |
| void acknowledgeDelivery(MQTTClient_deliveryToken token) { |
| std::lock_guard<std::mutex> lock(delivery_mutex_); |
| // locked the mutex |
| auto finder = delivered_.find(token); |
| // only acknowledge delivery if we expect the delivery to occur, otherwise |
| // we won't have any waiters. |
| if (finder != delivered_.end()) { |
| delivered_[token] = true; |
| } |
| } |
| |
| void enqueue(const std::string &topic, Message &&message) { |
| std::unique_lock<std::mutex> lock(delivery_mutex_); |
| topics_[topic].enqueue(std::move(message)); |
| delivery_notification_.notify_one(); |
| } |
| |
| static void deliveryCallback(void *context, MQTTClient_deliveryToken dt) { |
| MQTTControllerService *service = (MQTTControllerService *) context; |
| service->acknowledgeDelivery(dt); |
| } |
| |
| static int receiveCallback(void *context, char *topicName, int topicLen, MQTTClient_message *message) { |
| MQTTControllerService *service = (MQTTControllerService *) context; |
| std::string topic(topicName, topicLen == 0 ? strlen(topicName) : topicLen); |
| Message queueMessage(topic, message->payload, message->payloadlen); |
| service->enqueue(topic, std::move(queueMessage)); |
| MQTTClient_freeMessage(&message); |
| MQTTClient_free(topicName); |
| return 1; |
| } |
| static void reconnectCallback(void *context, char *cause) { |
| MQTTControllerService *service = (MQTTControllerService *) context; |
| service->reconnect(); |
| } |
| |
| bool reconnect() { |
| if (!client_) |
| return false; |
| if (MQTTClient_isConnected(client_)) |
| return true; |
| MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; |
| conn_opts.keepAliveInterval = keepAliveInterval_; |
| conn_opts.cleansession = 1; |
| if (!userName_.empty()) { |
| conn_opts.username = userName_.c_str(); |
| conn_opts.password = passWord_.c_str(); |
| } |
| if (ssl_context_service_ != nullptr) |
| conn_opts.ssl = &sslopts_; |
| if (MQTTClient_connect(client_, &conn_opts) != MQTTCLIENT_SUCCESS) { |
| logger_->log_error("Failed to connect to MQTT broker %s", uri_); |
| return false; |
| } |
| |
| if (!topic_.empty()) { |
| std::unique_lock<std::mutex> lock(delivery_mutex_); |
| MQTTClient_subscribe(client_, topic_.c_str(), qos_); |
| } |
| return true; |
| } |
| |
| virtual void initializeProperties(); |
| |
| std::mutex initialization_mutex_; |
| std::atomic<bool> initialized_; |
| |
| MQTTClient client_; |
| std::string uri_; |
| std::string topic_; |
| int64_t keepAliveInterval_; |
| int64_t connectionTimeOut_; |
| int64_t qos_; |
| std::string clientID_; |
| std::string userName_; |
| std::string passWord_; |
| |
| private: |
| |
| std::map<int, bool> delivered_; |
| std::map<std::string, moodycamel::ConcurrentQueue<Message> > topics_; |
| |
| std::mutex delivery_mutex_; |
| std::condition_variable delivery_notification_; |
| |
| MQTTClient_SSLOptions sslopts_; |
| |
| std::shared_ptr<controllers::SSLContextService> ssl_context_service_; |
| |
| std::shared_ptr<logging::Logger> logger_; |
| |
| }; |
| |
| } /* namespace controllers */ |
| } /* namespace minifi */ |
| } /* namespace nifi */ |
| } /* namespace apache */ |
| } /* namespace org */ |
| |
| #endif /* LIBMINIFI_INCLUDE_CONTROLLERS_MQTTCONTEXTSERVICE_H_ */ |