blob: 8db15a13e6f8ee76c74f36c18708675be2eaba72 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MQTTC2Protocol.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace c2 {
MQTTC2Protocol::MQTTC2Protocol(std::string name, utils::Identifier uuid)
: C2Protocol(name, uuid),
logger_(logging::LoggerFactory<Connectable>::getLogger()) {
}
MQTTC2Protocol::~MQTTC2Protocol() = default;
void MQTTC2Protocol::initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) {
if (configure->get("nifi.c2.mqtt.connector.service", controller_service_name_)) {
auto service = controller->getControllerService(controller_service_name_);
mqtt_service_ = std::static_pointer_cast<controllers::MQTTControllerService>(service);
} else
mqtt_service_ = nullptr;
agent_identifier_ = configure->getAgentIdentifier();
std::stringstream outputStream;
std::string updateTopicOpt, heartbeatTopicOpt;
if (configure->get("nifi.c2.mqtt.heartbeat.topic", heartbeatTopicOpt)) {
heartbeat_topic_ = heartbeatTopicOpt;
} else {
heartbeat_topic_ = "heartbeats"; // outputStream.str();
}
if (configure->get("nifi.c2.mqtt.update.topic", updateTopicOpt)) {
update_topic_ = updateTopicOpt;
} else {
update_topic_ = "updates";
}
std::stringstream inputStream;
inputStream << agent_identifier_ << "/in";
in_topic_ = inputStream.str();
if (mqtt_service_) {
mqtt_service_->subscribeToTopic(in_topic_);
}
}
C2Payload MQTTC2Protocol::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) {
// we are getting an update.
std::lock_guard<std::mutex> lock(input_mutex_);
io::BufferStream stream;
stream.write(in_topic_);
stream.write(url);
std::vector<uint8_t> response;
auto transmit_id = mqtt_service_->send(update_topic_, stream.getBuffer(), stream.size());
if (transmit_id > 0 && mqtt_service_->awaitResponse(5000, transmit_id, in_topic_, response)) {
C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true);
response_payload.setRawData(response);
return response_payload;
} else {
return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
}
}
C2Payload MQTTC2Protocol::serialize(const C2Payload &payload) {
if (mqtt_service_ == nullptr || !mqtt_service_->isRunning()) {
return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
}
std::lock_guard<std::mutex> lock(input_mutex_);
auto stream = c2::PayloadSerializer::serialize(0x00, payload);
auto transmit_id = mqtt_service_->send(heartbeat_topic_, stream->getBuffer(), stream->size());
std::vector<uint8_t> response;
if (transmit_id > 0 && mqtt_service_->awaitResponse(5000, transmit_id, in_topic_, response)) {
return c2::PayloadSerializer::deserialize(response);
}
return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
}
} /* namespace c2 */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */