blob: f51d5a31f742a1a53d1f9511883b4e80a907043c [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "CoapC2Protocol.h"
#include "c2/PayloadSerializer.h"
#include "c2/PayloadParser.h"
#include "coap_functions.h"
#include "coap_message.h"
#include "io/BaseStream.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace coap {
namespace c2 {
uint8_t CoapProtocol::REGISTRATION_MSG[8] = { 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72 };
CoapProtocol::CoapProtocol(const std::string &name, const utils::Identifier &uuid)
: RESTSender(name, uuid),
require_registration_(false),
logger_(logging::LoggerFactory<CoapProtocol>::getLogger()) {
}
CoapProtocol::~CoapProtocol() = default;
void CoapProtocol::initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) {
RESTSender::initialize(controller, configure);
if (configure->get("nifi.c2.coap.connector.service", controller_service_name_)) {
auto service = controller->getControllerService(controller_service_name_);
coap_service_ = std::static_pointer_cast<coap::controllers::CoapConnectorService>(service);
} else {
logger_->log_info("No CoAP connector configured, so using default service");
coap_service_ = std::make_shared<coap::controllers::CoapConnectorService>("cs", configure);
coap_service_->onEnable();
}
}
minifi::c2::C2Payload CoapProtocol::consumePayload(const std::string &url, const minifi::c2::C2Payload &payload, minifi::c2::Direction direction, bool async) {
return RESTSender::consumePayload(url, payload, direction, false);
}
int CoapProtocol::writeAcknowledgement(io::BaseStream *stream, const minifi::c2::C2Payload &payload) {
auto ident = payload.getIdentifier();
auto state = payload.getStatus().getState();
stream->writeUTF(ident);
uint8_t payloadState = 0;
switch (state) {
case state::UpdateState::NESTED:
case state::UpdateState::INITIATE:
case state::UpdateState::FULLY_APPLIED:
case state::UpdateState::READ_COMPLETE:
payloadState = 0;
break;
case state::UpdateState::NOT_APPLIED:
case state::UpdateState::PARTIALLY_APPLIED:
payloadState = 1;
break;
case state::UpdateState::READ_ERROR:
payloadState = 2;
break;
case state::UpdateState::SET_ERROR:
payloadState = 3;
break;
}
stream->write(&payloadState, 1);
return 0;
}
int CoapProtocol::writeHeartbeat(io::BaseStream *stream, const minifi::c2::C2Payload &payload) {
bool byte;
uint16_t size = 0;
logger_->log_trace("Writing heartbeat");
try {
const std::string deviceIdent = minifi::c2::PayloadParser::getInstance(payload).in("deviceInfo").getAs<std::string>("identifier");
const std::string agentIdent = minifi::c2::PayloadParser::getInstance(payload).in("agentInfo").getAs<std::string>("identifier");
stream->writeUTF(deviceIdent, false);
logger_->log_trace("Writing heartbeat with device Ident %s and agent Ident %s", deviceIdent, agentIdent);
if (agentIdent.empty()) {
return -1;
}
stream->writeUTF(agentIdent, false);
try {
auto flowInfoParser = minifi::c2::PayloadParser::getInstance(payload).in("flowInfo");
auto componentParser = flowInfoParser.in("components");
auto queueParser = flowInfoParser.in("queues");
auto vfsParser = flowInfoParser.in("versionedFlowSnapshotURI");
byte = true;
stream->write(byte);
size = componentParser.getSize();
stream->write(size);
componentParser.foreach([this, stream](const minifi::c2::C2Payload &component) {
auto myParser = minifi::c2::PayloadParser::getInstance(component);
bool running = false;
stream->writeUTF(component.getLabel());
try {
running = myParser.getAs<bool>("running");
}
catch(const minifi::c2::PayloadParseException &e) {
logger_->log_error("Could not find running in components");
}
stream->write(running);
});
size = queueParser.getSize();
stream->write(size);
queueParser.foreach([this, stream](const minifi::c2::C2Payload &component) {
auto myParser = minifi::c2::PayloadParser::getInstance(component);
stream->writeUTF(component.getLabel());
uint64_t datasize = 0, datasizemax = 0, qsize = 0, sizemax = 0;
try {
datasize = myParser.getAs<uint64_t>("dataSize");
datasizemax = myParser.getAs<uint64_t>("dataSizeMax");
qsize = myParser.getAs<uint64_t>("size");
sizemax = myParser.getAs<uint64_t>("sizeMax");
}
catch(const minifi::c2::PayloadParseException &e) {
logger_->log_error("Could not find queue sizes");
}
stream->write(datasize);
stream->write(datasizemax);
stream->write(qsize);
stream->write(sizemax);
});
auto bucketId = vfsParser.getAs<std::string>("bucketId");
auto flowId = vfsParser.getAs<std::string>("flowId");
stream->writeUTF(bucketId);
stream->writeUTF(flowId);
} catch (const minifi::c2::PayloadParseException &pe) {
logger_->log_error("Parser exception occurred, but is ignorable, reason %s", pe.what());
// okay to ignore
byte = false;
stream->write(byte);
}
} catch (const minifi::c2::PayloadParseException &e) {
logger_->log_error("Parser exception occurred, reason %s", e.what());
return -1;
}
return 0;
}
minifi::c2::Operation CoapProtocol::getOperation(int type) const {
switch (type) {
case 0:
return minifi::c2::ACKNOWLEDGE;
case 1:
return minifi::c2::HEARTBEAT;
case 2:
return minifi::c2::CLEAR;
case 3:
return minifi::c2::DESCRIBE;
case 4:
return minifi::c2::RESTART;
case 5:
return minifi::c2::START;
case 6:
return minifi::c2::UPDATE;
case 7:
return minifi::c2::STOP;
}
return minifi::c2::ACKNOWLEDGE;
}
minifi::c2::C2Payload CoapProtocol::serialize(const minifi::c2::C2Payload &payload) {
if (nullptr == coap_service_) {
// return an error if we don't have a coap service
logger_->log_error("CoAP service requested without any configured hostname or port");
return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
}
if (require_registration_) {
logger_->log_debug("Server requested agent registration, so attempting");
auto response = minifi::c2::RESTSender::consumePayload(rest_uri_, payload, minifi::c2::TRANSMIT, false);
if (response.getStatus().getState() == state::UpdateState::READ_ERROR) {
logger_->log_trace("Could not register");
return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
} else {
logger_->log_trace("Registered agent.");
}
require_registration_ = false;
return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
}
uint16_t version = 0;
uint8_t payload_type = 0;
uint64_t payload_u64 = 0;
uint16_t size = 0;
io::BaseStream stream;
stream.write(version);
std::string endpoint = "heartbeat";
switch (payload.getOperation()) {
case minifi::c2::ACKNOWLEDGE:
endpoint = "acknowledge";
payload_type = 0;
stream.write(&payload_type, 1);
if (writeAcknowledgement(&stream, payload) != 0) {
return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
}
break;
case minifi::c2::HEARTBEAT:
payload_type = 1;
stream.write(&payload_type, 1);
if (writeHeartbeat(&stream, payload) != 0) {
logger_->log_error("Could not write heartbeat");
return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
}
break;
default:
logger_->log_error("Could not identify operation");
return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
};
size_t bsize = stream.getSize();
CoapMessage msg;
msg.data_ = const_cast<uint8_t *>(stream.getBuffer());
msg.size_ = bsize;
coap::controllers::CoapResponse message = coap_service_->sendPayload(COAP_REQUEST_POST, endpoint, &msg);
if (isRegistrationMessage(message)) {
require_registration_ = true;
} else if (message.getSize() > 0) {
io::DataStream byteStream(message.getData(), message.getSize());
io::BaseStream responseStream(&byteStream);
responseStream.read(version);
responseStream.read(size);
logger_->log_trace("Received ack. version %d. number of operations %d", version, size);
minifi::c2::C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true);
for (int i = 0; i < size; i++) {
uint8_t operationType;
uint16_t argsize = 0;
std::string operand, id;
REQUIRE_SIZE_IF(1, responseStream.read(operationType))
REQUIRE_VALID(responseStream.readUTF(id, false))
REQUIRE_VALID(responseStream.readUTF(operand, false))
logger_->log_trace("Received op %d, with id %s and operand %s", operationType, id, operand);
auto newOp = getOperation(operationType);
minifi::c2::C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true);
nested_payload.setIdentifier(id);
minifi::c2::C2ContentResponse new_command(newOp);
new_command.delay = 0;
new_command.required = true;
new_command.ttl = -1;
new_command.name = operand;
new_command.ident = id;
responseStream.read(argsize);
for (int j = 0; j < argsize; j++) {
std::string key, value;
REQUIRE_VALID(responseStream.readUTF(key))
REQUIRE_VALID(responseStream.readUTF(value))
new_command.operation_arguments[key] = value;
}
nested_payload.addContent(std::move(new_command));
new_payload.addPayload(std::move(nested_payload));
}
return new_payload;
}
return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true);
}
} /* namespace c2 */
} /* namespace coap */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */