| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "c2/C2Agent.h" |
| #include <csignal> |
| #include <utility> |
| #include <limits> |
| #include <vector> |
| #include <map> |
| #include <string> |
| #include <memory> |
| #include "c2/ControllerSocketProtocol.h" |
| #include "core/state/UpdateController.h" |
| #include "core/logging/Logger.h" |
| #include "core/logging/LoggerConfiguration.h" |
| #include "utils/file/DiffUtils.h" |
| #include "utils/file/FileUtils.h" |
| #include "utils/file/FileManager.h" |
| #include "utils/HTTPClient.h" |
| namespace org { |
| namespace apache { |
| namespace nifi { |
| namespace minifi { |
| namespace c2 { |
| |
| C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, |
| const std::shared_ptr<Configure> &configuration) |
| : heart_beat_period_(3000), |
| max_c2_responses(5), |
| update_sink_(updateSink), |
| update_service_(nullptr), |
| controller_(controller), |
| configuration_(configuration), |
| protocol_(nullptr), |
| logger_(logging::LoggerFactory<C2Agent>::getLogger()) { |
| allow_updates_ = true; |
| |
| manifest_sent_ = false; |
| |
| running_c2_configuration = std::make_shared<Configure>(); |
| |
| last_run_ = std::chrono::steady_clock::now(); |
| |
| if (nullptr != controller_) { |
| update_service_ = std::static_pointer_cast<controllers::UpdatePolicyControllerService>(controller_->getControllerService(C2_AGENT_UPDATE_NAME)); |
| } |
| |
| if (update_service_ == nullptr) { |
| // create a stubbed service for updating the flow identifier |
| } |
| |
| configure(configuration, false); |
| |
| c2_producer_ = [&]() { |
| auto now = std::chrono::steady_clock::now(); |
| auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_run_).count(); |
| |
| // place priority on messages to send to the c2 server |
| if ( protocol_.load() != nullptr && request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) { |
| if (requests.size() > 0) { |
| int count = 0; |
| do { |
| const C2Payload payload(std::move(requests.back())); |
| requests.pop_back(); |
| try { |
| C2Payload && response = protocol_.load()->consumePayload(payload); |
| enqueue_c2_server_response(std::move(response)); |
| } |
| catch(const std::exception &e) { |
| logger_->log_error("Exception occurred while consuming payload. error: %s", e.what()); |
| } |
| catch(...) { |
| logger_->log_error("Unknonwn exception occurred while consuming payload."); |
| } |
| }while(requests.size() > 0 && ++count < max_c2_responses); |
| } |
| request_mutex.unlock(); |
| } |
| |
| if ( time_since > heart_beat_period_ ) { |
| last_run_ = now; |
| try { |
| performHeartBeat(); |
| } |
| catch(const std::exception &e) { |
| logger_->log_error("Exception occurred while performing heartbeat. error: %s", e.what()); |
| } |
| catch(...) { |
| logger_->log_error("Unknonwn exception occurred while performing heartbeat."); |
| } |
| } |
| |
| checkTriggers(); |
| |
| std::this_thread::sleep_for(std::chrono::milliseconds(heart_beat_period_ > 500 ? 500 : heart_beat_period_)); |
| return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false)); |
| }; |
| |
| functions_.push_back(c2_producer_); |
| |
| c2_consumer_ = [&]() { |
| auto now = std::chrono::steady_clock::now(); |
| if ( queue_mutex.try_lock_until(now + std::chrono::seconds(1)) ) { |
| if (responses.size() > 0) { |
| const C2Payload payload(std::move(responses.back())); |
| responses.pop_back(); |
| extractPayload(std::move(payload)); |
| } |
| queue_mutex.unlock(); |
| } |
| return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false)); |
| }; |
| |
| functions_.push_back(c2_consumer_); |
| } |
| |
| void C2Agent::checkTriggers() { |
| logger_->log_debug("Checking %d triggers", triggers_.size()); |
| for (const auto &trigger : triggers_) { |
| if (trigger->triggered()) { |
| /** |
| * Action was triggered, so extract it. |
| */ |
| C2Payload &&triggerAction = trigger->getAction(); |
| logger_->log_trace("%s action triggered", trigger->getName()); |
| // handle the response the same way. This means that |
| // acknowledgements will be sent to the c2 server for every trigger action. |
| // this is expected |
| extractPayload(std::move(triggerAction)); |
| // call reset if the trigger supports this activity |
| trigger->reset(); |
| } else { |
| logger_->log_trace("%s action not triggered", trigger->getName()); |
| } |
| } |
| } |
| void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconfigure) { |
| std::string clazz, heartbeat_period, device; |
| |
| if (!reconfigure) { |
| if (!configure->get("nifi.c2.agent.protocol.class", "c2.agent.protocol.class", clazz)) { |
| clazz = "CoapProtocol"; |
| } |
| logger_->log_info("Class is %s", clazz); |
| |
| auto protocol = core::ClassLoader::getDefaultClassLoader().instantiateRaw(clazz, clazz); |
| if (protocol == nullptr) { |
| logger_->log_warn("Class %s not found", clazz); |
| protocol = core::ClassLoader::getDefaultClassLoader().instantiateRaw("CoapProtocol", "CoapProtocol"); |
| if (!protocol) { |
| const char* errmsg = "Attempted to load CoapProtocol. To enable C2, please specify an active protocol for this agent."; |
| logger_->log_error(errmsg); |
| throw minifi::Exception{ minifi::GENERAL_EXCEPTION, errmsg }; |
| } |
| |
| logger_->log_info("Class is CoapProtocol"); |
| } |
| |
| // Since !reconfigure, the call comes from the ctor and protocol_ is null, therefore no delete is necessary |
| protocol_.exchange(dynamic_cast<C2Protocol *>(protocol)); |
| |
| protocol_.load()->initialize(controller_, configuration_); |
| } else { |
| protocol_.load()->update(configure); |
| } |
| |
| if (configure->get("nifi.c2.agent.heartbeat.period", "c2.agent.heartbeat.period", heartbeat_period)) { |
| core::TimeUnit unit; |
| |
| try { |
| int64_t schedulingPeriod = 0; |
| if (core::Property::StringToTime(heartbeat_period, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToMS(schedulingPeriod, unit, schedulingPeriod)) { |
| heart_beat_period_ = schedulingPeriod; |
| logger_->log_debug("Using %u ms as the heartbeat period", heart_beat_period_); |
| } else { |
| heart_beat_period_ = std::stoi(heartbeat_period); |
| } |
| } catch (const std::invalid_argument &ie) { |
| heart_beat_period_ = 3000; |
| } |
| } else { |
| if (!reconfigure) |
| heart_beat_period_ = 3000; |
| } |
| |
| std::string update_settings; |
| if (configure->get("nifi.c2.agent.update.allow", "c2.agent.update.allow", update_settings) && utils::StringUtils::StringToBool(update_settings, allow_updates_)) { |
| // allow the agent to be updated. we then need to get an update command to execute after |
| } |
| |
| if (allow_updates_) { |
| if (!configure->get("nifi.c2.agent.update.command", "c2.agent.update.command", update_command_)) { |
| char cwd[1024]; |
| if (getcwd(cwd, sizeof(cwd)) == nullptr) { |
| logger_->log_error("Could not set update command, reason %s", std::strerror(errno)); |
| |
| } else { |
| std::stringstream command; |
| command << cwd << "/minifi.sh update"; |
| update_command_ = command.str(); |
| } |
| } |
| |
| if (!configure->get("nifi.c2.agent.update.temp.location", "c2.agent.update.temp.location", update_location_)) { |
| char cwd[1024]; |
| if (getcwd(cwd, sizeof(cwd)) == nullptr) { |
| logger_->log_error("Could not set copy path, reason %s", std::strerror(errno)); |
| } else { |
| std::stringstream copy_path; |
| std::stringstream command; |
| copy_path << cwd << "/minifi.update"; |
| } |
| } |
| |
| // if not defined we won't beable to update |
| configure->get("nifi.c2.agent.bin.location", "c2.agent.bin.location", bin_location_); |
| } |
| std::string heartbeat_reporters; |
| if (configure->get("nifi.c2.agent.heartbeat.reporter.classes", "c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) { |
| std::vector<std::string> reporters = utils::StringUtils::split(heartbeat_reporters, ","); |
| std::lock_guard<std::mutex> lock(heartbeat_mutex); |
| for (const auto& reporter : reporters) { |
| auto heartbeat_reporter_obj = core::ClassLoader::getDefaultClassLoader().instantiate(reporter, reporter); |
| if (heartbeat_reporter_obj == nullptr) { |
| logger_->log_debug("Could not instantiate %s", reporter); |
| } else { |
| std::shared_ptr<HeartBeatReporter> shp_reporter = std::static_pointer_cast<HeartBeatReporter>(heartbeat_reporter_obj); |
| shp_reporter->initialize(controller_, update_sink_, configuration_); |
| heartbeat_protocols_.push_back(shp_reporter); |
| } |
| } |
| } |
| |
| std::string trigger_classes; |
| if (configure->get("nifi.c2.agent.trigger.classes", "c2.agent.trigger.classes", trigger_classes)) { |
| std::vector<std::string> triggers = utils::StringUtils::split(trigger_classes, ","); |
| std::lock_guard<std::mutex> lock(heartbeat_mutex); |
| for (const auto& trigger : triggers) { |
| auto trigger_obj = core::ClassLoader::getDefaultClassLoader().instantiate(trigger, trigger); |
| if (trigger_obj == nullptr) { |
| logger_->log_debug("Could not instantiate %s", trigger); |
| } else { |
| std::shared_ptr<C2Trigger> trg_impl = std::static_pointer_cast<C2Trigger>(trigger_obj); |
| trg_impl->initialize(configuration_); |
| triggers_.push_back(trg_impl); |
| } |
| } |
| } |
| |
| auto base_reporter = "ControllerSocketProtocol"; |
| auto heartbeat_reporter_obj = core::ClassLoader::getDefaultClassLoader().instantiate(base_reporter, base_reporter); |
| if (heartbeat_reporter_obj == nullptr) { |
| logger_->log_debug("Could not instantiate %s", base_reporter); |
| } else { |
| std::shared_ptr<HeartBeatReporter> shp_reporter = std::static_pointer_cast<HeartBeatReporter>(heartbeat_reporter_obj); |
| shp_reporter->initialize(controller_, update_sink_, configuration_); |
| heartbeat_protocols_.push_back(shp_reporter); |
| } |
| } |
| |
| void C2Agent::performHeartBeat() { |
| C2Payload payload(Operation::HEARTBEAT); |
| |
| logger_->log_trace("Performing heartbeat"); |
| |
| std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_copy; |
| { |
| std::lock_guard<std::timed_mutex> lock(metrics_mutex_); |
| if (metrics_map_.size() > 0) { |
| metrics_copy = std::move(metrics_map_); |
| } |
| } |
| |
| if (metrics_copy.size() > 0) { |
| C2Payload metrics(Operation::HEARTBEAT); |
| metrics.setLabel("metrics"); |
| |
| for (auto metric : metrics_copy) { |
| if (metric.second->serialize().size() == 0) |
| continue; |
| C2Payload child_metric_payload(Operation::HEARTBEAT); |
| child_metric_payload.setLabel(metric.first); |
| serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray()); |
| metrics.addPayload(std::move(child_metric_payload)); |
| } |
| payload.addPayload(std::move(metrics)); |
| } |
| |
| for (auto metric : root_response_nodes_) { |
| C2Payload child_metric_payload(Operation::HEARTBEAT); |
| bool isArray{false}; |
| std::string metricName; |
| std::vector<state::response::SerializedResponseNode> metrics; |
| std::shared_ptr<state::response::NodeReporter> reporter; |
| std::shared_ptr<state::response::ResponseNode> agentInfo; |
| |
| // Send agent manifest in first heartbeat |
| if (!manifest_sent_ |
| && (reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_)) |
| && (agentInfo = reporter->getAgentInformation()) |
| && metric.first == agentInfo->getName()) { |
| metricName = agentInfo->getName(); |
| isArray = agentInfo->isArray(); |
| metrics = agentInfo->serialize(); |
| manifest_sent_ = true; |
| } else { |
| metricName = metric.first; |
| isArray = metric.second->isArray(); |
| metrics = metric.second->serialize(); |
| } |
| child_metric_payload.setLabel(metricName); |
| if (isArray) { |
| child_metric_payload.setContainer(true); |
| } |
| serializeMetrics(child_metric_payload, metricName, metrics, isArray); |
| payload.addPayload(std::move(child_metric_payload)); |
| } |
| C2Payload && response = protocol_.load()->consumePayload(payload); |
| |
| enqueue_c2_server_response(std::move(response)); |
| |
| std::lock_guard<std::mutex> lock(heartbeat_mutex); |
| |
| for (auto reporter : heartbeat_protocols_) { |
| reporter->heartbeat(payload); |
| } |
| } |
| |
| void C2Agent::serializeMetrics(C2Payload &metric_payload, const std::string &name, const std::vector<state::response::SerializedResponseNode> &metrics, bool is_container, bool is_collapsible) { |
| for (const auto &metric : metrics) { |
| if (metric.children.size() > 0) { |
| C2Payload child_metric_payload(metric_payload.getOperation()); |
| if (metric.array) { |
| child_metric_payload.setContainer(true); |
| } |
| auto collapsible = !metric.collapsible ? metric.collapsible : is_collapsible; |
| child_metric_payload.setCollapsible(collapsible); |
| child_metric_payload.setLabel(metric.name); |
| serializeMetrics(child_metric_payload, metric.name, metric.children, is_container, collapsible); |
| metric_payload.addPayload(std::move(child_metric_payload)); |
| } else { |
| C2ContentResponse response(metric_payload.getOperation()); |
| response.name = name; |
| response.operation_arguments[metric.name] = metric.value; |
| metric_payload.addContent(std::move(response), is_collapsible); |
| } |
| } |
| } |
| |
| void C2Agent::extractPayload(const C2Payload &&resp) { |
| if (resp.getStatus().getState() == state::UpdateState::NESTED) { |
| const std::vector<C2Payload> &payloads = resp.getNestedPayloads(); |
| |
| for (const auto &payload : payloads) { |
| extractPayload(std::move(payload)); |
| } |
| return; |
| } |
| switch (resp.getStatus().getState()) { |
| case state::UpdateState::INITIATE: |
| logger_->log_debug("Received initiation event from protocol"); |
| break; |
| case state::UpdateState::READ_COMPLETE: |
| logger_->log_trace("Received Ack from Server"); |
| // we have a heartbeat response. |
| for (const auto &server_response : resp.getContent()) { |
| handle_c2_server_response(server_response); |
| } |
| break; |
| case state::UpdateState::FULLY_APPLIED: |
| logger_->log_debug("Received fully applied event from protocol"); |
| break; |
| case state::UpdateState::PARTIALLY_APPLIED: |
| logger_->log_debug("Received partially applied event from protocol"); |
| break; |
| case state::UpdateState::NOT_APPLIED: |
| logger_->log_debug("Received not applied event from protocol"); |
| break; |
| case state::UpdateState::SET_ERROR: |
| logger_->log_debug("Received error event from protocol"); |
| break; |
| case state::UpdateState::READ_ERROR: |
| logger_->log_debug("Received error event from protocol"); |
| break; |
| case state::UpdateState::NESTED: // multiple updates embedded into one |
| |
| default: |
| logger_->log_debug("Received nested event from protocol"); |
| break; |
| } |
| } |
| |
| void C2Agent::extractPayload(const C2Payload &resp) { |
| if (resp.getStatus().getState() == state::UpdateState::NESTED) { |
| const std::vector<C2Payload> &payloads = resp.getNestedPayloads(); |
| for (const auto &payload : payloads) { |
| extractPayload(payload); |
| } |
| } |
| switch (resp.getStatus().getState()) { |
| case state::UpdateState::READ_COMPLETE: |
| // we have a heartbeat response. |
| for (const auto &server_response : resp.getContent()) { |
| handle_c2_server_response(server_response); |
| } |
| break; |
| default: |
| break; |
| } |
| } |
| |
| void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) { |
| switch (resp.op) { |
| case Operation::CLEAR: |
| // we've been told to clear something |
| if (resp.name == "connection") { |
| for (auto connection : resp.operation_arguments) { |
| logger_->log_debug("Clearing connection %s", connection.second.to_string()); |
| update_sink_->clearConnection(connection.second.to_string()); |
| } |
| C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); |
| enqueue_c2_response(std::move(response)); |
| } else if (resp.name == "repositories") { |
| update_sink_->drainRepositories(); |
| C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); |
| enqueue_c2_response(std::move(response)); |
| } else { |
| logger_->log_debug("Clearing unknown %s", resp.name); |
| } |
| |
| break; |
| case Operation::UPDATE: { |
| handle_update(resp); |
| } |
| break; |
| |
| case Operation::DESCRIBE: |
| handle_describe(resp); |
| break; |
| case Operation::RESTART: { |
| update_sink_->stop(true); |
| C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); |
| protocol_.load()->consumePayload(std::move(response)); |
| exit(1); |
| } |
| break; |
| case Operation::START: |
| case Operation::STOP: { |
| if (resp.name == "C2" || resp.name == "c2") { |
| raise(SIGTERM); |
| } |
| |
| std::vector<std::shared_ptr<state::StateController>> components = update_sink_->getComponents(resp.name); |
| |
| // stop all referenced components. |
| for (auto &component : components) { |
| logger_->log_debug("Stopping component %s", component->getComponentName()); |
| if (resp.op == Operation::STOP) |
| component->stop(true); |
| else |
| component->start(); |
| } |
| |
| if (resp.ident.length() > 0) { |
| C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); |
| enqueue_c2_response(std::move(response)); |
| } |
| } |
| // |
| break; |
| default: |
| break; |
| // do nothing |
| } |
| } |
| |
| C2Payload C2Agent::prepareConfigurationOptions(const C2ContentResponse &resp) const { |
| auto unsanitized_keys = configuration_->getConfiguredKeys(); |
| std::vector<std::string> keys; |
| std::copy_if(unsanitized_keys.begin(), unsanitized_keys.end(), std::back_inserter(keys), |
| [](std::string key) {return key.find("pass") == std::string::npos;}); |
| |
| C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); |
| C2Payload options(Operation::ACKNOWLEDGE); |
| options.setLabel("configuration_options"); |
| std::string value; |
| for (auto key : keys) { |
| C2ContentResponse option(Operation::ACKNOWLEDGE); |
| option.name = key; |
| if (configuration_->get(key, value)) { |
| option.operation_arguments[key] = value; |
| options.addContent(std::move(option)); |
| } |
| } |
| response.addPayload(std::move(options)); |
| return response; |
| } |
| |
| /** |
| * Descriptions are special types of requests that require information |
| * to be put into the acknowledgement |
| */ |
| void C2Agent::handle_describe(const C2ContentResponse &resp) { |
| if (resp.name == "metrics") { |
| auto reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_); |
| |
| if (reporter != nullptr) { |
| auto metricsClass = resp.operation_arguments.find("metricsClass"); |
| uint8_t metric_class_id = 0; |
| if (metricsClass != resp.operation_arguments.end()) { |
| // we have a class |
| try { |
| metric_class_id = std::stoi(metricsClass->second.to_string()); |
| } catch (...) { |
| logger_->log_error("Could not convert %s into an integer", metricsClass->second.to_string()); |
| } |
| } |
| |
| std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec; |
| C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); |
| C2Payload metrics(Operation::ACKNOWLEDGE); |
| metrics.setLabel("metrics"); |
| reporter->getResponseNodes(metrics_vec, 0); |
| for (auto metric : metrics_vec) { |
| serializeMetrics(metrics, metric->getName(), metric->serialize()); |
| } |
| response.addPayload(std::move(metrics)); |
| enqueue_c2_response(std::move(response)); |
| } |
| |
| } else if (resp.name == "configuration") { |
| auto configOptions = prepareConfigurationOptions(resp); |
| enqueue_c2_response(std::move(configOptions)); |
| return; |
| } else if (resp.name == "manifest") { |
| C2Payload response(prepareConfigurationOptions(resp)); |
| |
| auto reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_); |
| if (reporter != nullptr) { |
| std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec; |
| |
| C2Payload agentInfo(Operation::ACKNOWLEDGE, resp.ident, false, true); |
| agentInfo.setLabel("agentInfo"); |
| |
| reporter->getManifestNodes(metrics_vec); |
| for (const auto& metric : metrics_vec) { |
| serializeMetrics(agentInfo, metric->getName(), metric->serialize()); |
| } |
| response.addPayload(std::move(agentInfo)); |
| } |
| |
| enqueue_c2_response(std::move(response)); |
| return; |
| } else if (resp.name == "jstack") { |
| if (update_sink_->isRunning()) { |
| const std::vector<BackTrace> traces = update_sink_->getTraces(); |
| for (const auto &trace : traces) { |
| for (const auto & line : trace.getTraces()) { |
| logger_->log_trace("%s -- %s", trace.getName(), line); |
| } |
| } |
| auto keys = configuration_->getConfiguredKeys(); |
| C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); |
| for (const auto &trace : traces) { |
| C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true); |
| options.setLabel(trace.getName()); |
| std::string value; |
| for (const auto &line : trace.getTraces()) { |
| C2ContentResponse option(Operation::ACKNOWLEDGE); |
| option.name = line; |
| option.operation_arguments[line] = line; |
| options.addContent(std::move(option)); |
| } |
| response.addPayload(std::move(options)); |
| } |
| enqueue_c2_response(std::move(response)); |
| } |
| return; |
| } |
| C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); |
| enqueue_c2_response(std::move(response)); |
| } |
| |
| void C2Agent::handle_update(const C2ContentResponse &resp) { |
| // we've been told to update something |
| if (resp.name == "configuration") { |
| auto url = resp.operation_arguments.find("location"); |
| |
| auto persist = resp.operation_arguments.find("persist"); |
| |
| if (url != resp.operation_arguments.end()) { |
| // just get the raw data. |
| C2Payload payload(Operation::TRANSFER, false, true); |
| |
| auto urlStr = url->second.to_string(); |
| |
| std::string file_path = urlStr; |
| bool containsHttp = file_path.find("http") != std::string::npos; |
| if (!containsHttp) { |
| std::ifstream new_conf(file_path); |
| if (!new_conf.good()) { |
| containsHttp = true; |
| } |
| } |
| if (nullptr != protocol_.load() && containsHttp) { |
| std::stringstream newUrl; |
| if (urlStr.find("http") == std::string::npos) { |
| std::string base; |
| if (configuration_->get(minifi::Configure::nifi_c2_flow_base_url, base)) { |
| newUrl << base; |
| if (!utils::StringUtils::endsWith(base, "/")) { |
| newUrl << "/"; |
| } |
| newUrl << urlStr; |
| urlStr = newUrl.str(); |
| } else if (configuration_->get("c2.rest.url", base)) { |
| std::string host, protocol; |
| int port = -1; |
| utils::parse_url(&base, &host, &port, &protocol); |
| newUrl << protocol << host; |
| if (port > 0) { |
| newUrl << ":" << port; |
| } |
| newUrl << "/c2/api/" << urlStr; |
| urlStr = newUrl.str(); |
| } |
| } |
| |
| C2Payload &&response = protocol_.load()->consumePayload(urlStr, payload, RECEIVE, false); |
| |
| auto raw_data = response.getRawData(); |
| file_path = std::string(raw_data.data(), raw_data.size()); |
| } |
| |
| std::ifstream new_conf(file_path); |
| std::string raw_data_str((std::istreambuf_iterator<char>(new_conf)), std::istreambuf_iterator<char>()); |
| unlink(file_path.c_str()); |
| // if we can apply the update, we will acknowledge it and then backup the configuration file. |
| if (update_sink_->applyUpdate(urlStr, raw_data_str)) { |
| C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, resp.ident, false, true); |
| enqueue_c2_response(std::move(response)); |
| |
| if (persist != resp.operation_arguments.end() && utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) { |
| // update nifi.flow.configuration.file=./conf/config.yml |
| std::string config_file; |
| |
| configuration_->get(minifi::Configure::nifi_flow_configuration_file, config_file); |
| std::string adjustedFilename; |
| if (config_file[0] != '/') { |
| adjustedFilename = adjustedFilename + configuration_->getHome() + "/" + config_file; |
| } else { |
| adjustedFilename += config_file; |
| } |
| |
| config_file = adjustedFilename; |
| |
| std::stringstream config_file_backup; |
| config_file_backup << config_file << ".bak"; |
| // we must be able to successfully copy the file. |
| bool persist_config = true; |
| bool backup_file = false; |
| std::string backup_config; |
| |
| if (configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update, backup_config) && utils::StringUtils::StringToBool(backup_config, backup_file)) { |
| if (utils::file::FileUtils::copy_file(config_file, config_file_backup.str()) != 0) { |
| logger_->log_debug("Cannot copy %s to %s", config_file, config_file_backup.str()); |
| persist_config = false; |
| } |
| } |
| logger_->log_debug("Copy %s to %s %d", config_file, config_file_backup.str(), persist_config); |
| if (persist_config) { |
| std::ofstream writer(config_file); |
| if (writer.is_open()) { |
| writer.write(raw_data_str.data(), raw_data_str.size()); |
| } |
| writer.close(); |
| |
| // update the flow id |
| configuration_->persistProperties(); |
| } |
| } |
| } else { |
| logger_->log_debug("update failed."); |
| C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, resp.ident, false, true); |
| response.setRawData("Error while applying flow. Likely missing processors"); |
| enqueue_c2_response(std::move(response)); |
| } |
| // send |
| } else { |
| logger_->log_debug("Did not have location within %s", resp.ident); |
| auto update_text = resp.operation_arguments.find("configuration_data"); |
| if (update_text != resp.operation_arguments.end()) { |
| if (update_sink_->applyUpdate(url->second.to_string(), update_text->second.to_string()) != 0 && persist != resp.operation_arguments.end() |
| && utils::StringUtils::equalsIgnoreCase(persist->second.to_string(), "true")) { |
| C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, resp.ident, false, true); |
| enqueue_c2_response(std::move(response)); |
| // update nifi.flow.configuration.file=./conf/config.yml |
| std::string config_file; |
| std::stringstream config_file_backup; |
| config_file_backup << config_file << ".bak"; |
| |
| bool persist_config = true; |
| bool backup_file = false; |
| std::string backup_config; |
| |
| if (configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update, backup_config) && utils::StringUtils::StringToBool(backup_config, backup_file)) { |
| if (utils::file::FileUtils::copy_file(config_file, config_file_backup.str()) != 0) { |
| persist_config = false; |
| } |
| } |
| if (persist_config) { |
| std::ofstream writer(config_file); |
| if (writer.is_open()) { |
| auto output = update_text->second.to_string(); |
| writer.write(output.c_str(), output.size()); |
| } |
| writer.close(); |
| } |
| } else { |
| C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, resp.ident, false, true); |
| enqueue_c2_response(std::move(response)); |
| } |
| } |
| } |
| } else if (resp.name == "properties") { |
| bool update_occurred = false; |
| for (auto entry : resp.operation_arguments) { |
| if (update_property(entry.first, entry.second.to_string())) |
| update_occurred = true; |
| } |
| if (update_occurred) { |
| // enable updates to persist the configuration. |
| } |
| } else if (resp.name == "c2") { |
| // prior configuration options were already in place. thus |
| // we clear the map so that we don't go through replacing |
| // unnecessary objects. |
| running_c2_configuration->clear(); |
| |
| for (auto entry : resp.operation_arguments) { |
| bool can_update = true; |
| if (nullptr != update_service_) { |
| can_update = update_service_->canUpdate(entry.first); |
| } |
| if (can_update) |
| running_c2_configuration->set(entry.first, entry.second.to_string()); |
| } |
| |
| if (resp.operation_arguments.size() > 0) |
| configure(running_c2_configuration); |
| C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, resp.ident, false, true); |
| enqueue_c2_response(std::move(response)); |
| } else if (resp.name == "agent") { |
| // we are upgrading the agent. therefore we must be given a location |
| auto location = resp.operation_arguments.find("location"); |
| auto isPartialStr = resp.operation_arguments.find("partial"); |
| |
| bool partial_update = false; |
| if (isPartialStr != std::end(resp.operation_arguments)) { |
| partial_update = utils::StringUtils::equalsIgnoreCase(isPartialStr->second.to_string(), "true"); |
| } |
| if (location != resp.operation_arguments.end()) { |
| logger_->log_trace("Update agent with location %s", location->second.to_string()); |
| // we will not have a raw payload |
| C2Payload payload(Operation::TRANSFER, false, true); |
| |
| C2Payload &&response = protocol_.load()->consumePayload(location->second.to_string(), payload, RECEIVE, false); |
| |
| auto raw_data = response.getRawData(); |
| |
| std::string file_path = std::string(raw_data.data(), raw_data.size()); |
| |
| logger_->log_trace("Update requested with file %s", file_path); |
| |
| // acknowledge the transfer. For a transfer, the response identifier should be the checksum of the |
| // file transferred. |
| C2Payload transfer_response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, response.getIdentifier(), false, true); |
| |
| protocol_.load()->consumePayload(std::move(transfer_response)); |
| |
| if (allow_updates_) { |
| logger_->log_trace("Update allowed from file %s", file_path); |
| if (partial_update && !bin_location_.empty()) { |
| utils::file::DiffUtils::apply_binary_diff(bin_location_.c_str(), file_path.c_str(), update_location_.c_str()); |
| } else { |
| utils::file::FileUtils::copy_file(file_path, update_location_); |
| } |
| // remove the downloaded file. |
| logger_->log_trace("removing file %s", file_path); |
| unlink(file_path.c_str()); |
| update_agent(); |
| } else { |
| logger_->log_trace("Update disallowed from file %s", file_path); |
| } |
| |
| } else { |
| logger_->log_trace("No location present"); |
| } |
| } else { |
| C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::NOT_APPLIED, resp.ident, false, true); |
| enqueue_c2_response(std::move(response)); |
| } |
| } |
| |
| /** |
| * Updates a property |
| */ |
| bool C2Agent::update_property(const std::string &property_name, const std::string &property_value, bool persist) { |
| if (update_service_->canUpdate(property_name)) { |
| configuration_->set(property_name, property_value); |
| if (persist) { |
| configuration_->persistProperties(); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| void C2Agent::restart_agent() { |
| char cwd[1024]; |
| if (getcwd(cwd, sizeof(cwd)) == nullptr) { |
| logger_->log_error("Could not restart agent, reason %s", std::strerror(errno)); |
| return; |
| } |
| |
| std::stringstream command; |
| command << cwd << "/minifi.sh restart"; |
| } |
| |
| void C2Agent::update_agent() { |
| if (!system(update_command_.c_str())) { |
| logger_->log_warn("May not have command processor"); |
| } |
| } |
| |
| int16_t C2Agent::setResponseNodes(const std::shared_ptr<state::response::ResponseNode> &metric) { |
| auto now = std::chrono::steady_clock::now(); |
| if (metrics_mutex_.try_lock_until(now + std::chrono::seconds(1))) { |
| root_response_nodes_[metric->getName()] = metric; |
| metrics_mutex_.unlock(); |
| return 0; |
| } |
| return -1; |
| } |
| |
| int16_t C2Agent::setMetricsNodes(const std::shared_ptr<state::response::ResponseNode> &metric) { |
| auto now = std::chrono::steady_clock::now(); |
| if (metrics_mutex_.try_lock_until(now + std::chrono::seconds(1))) { |
| metrics_map_[metric->getName()] = metric; |
| metrics_mutex_.unlock(); |
| return 0; |
| } |
| return -1; |
| } |
| |
| } /* namespace c2 */ |
| } /* namespace minifi */ |
| } /* namespace nifi */ |
| } /* namespace apache */ |
| } /* namespace org */ |