blob: 411c58ac221447cf11d76a23cde66d223cf11215 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Controller.h"
#include <utility>
#include <fstream>
#include "io/BufferStream.h"
#include "c2/C2Payload.h"
#include "io/AsioStream.h"
#include "asio/ssl/context.hpp"
#include "asio/ssl/stream.hpp"
#include "asio/connect.hpp"
#include "core/logging/Logger.h"
#include "utils/net/AsioSocketUtils.h"
#include "utils/file/FileUtils.h"
namespace org::apache::nifi::minifi::controller {
bool sendSingleCommand(const utils::net::SocketData& socket_data, uint8_t op, const std::string& value) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
buffer.write(&op, 1);
buffer.write(value);
return connection_stream->write(buffer.getBuffer()) == buffer.size();
}
bool stopComponent(const utils::net::SocketData& socket_data, const std::string& component) {
return sendSingleCommand(socket_data, static_cast<uint8_t>(c2::Operation::stop), component);
}
bool startComponent(const utils::net::SocketData& socket_data, const std::string& component) {
return sendSingleCommand(socket_data, static_cast<uint8_t>(c2::Operation::start), component);
}
bool clearConnection(const utils::net::SocketData& socket_data, const std::string& connection) {
return sendSingleCommand(socket_data, static_cast<uint8_t>(c2::Operation::clear), connection);
}
bool updateFlow(const utils::net::SocketData& socket_data, std::ostream &out, const std::string& file) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
auto op = static_cast<uint8_t>(c2::Operation::update);
io::BufferStream buffer;
buffer.write(&op, 1);
buffer.write("flow");
buffer.write(file);
if (io::isError(connection_stream->write(buffer.getBuffer()))) {
return false;
}
// read the response
uint8_t resp = 0;
connection_stream->read(resp);
if (resp == static_cast<uint8_t>(c2::Operation::describe)) {
uint16_t connections = 0;
connection_stream->read(connections);
out << connections << " are full" << std::endl;
for (int i = 0; i < connections; i++) {
std::string fullcomponent;
connection_stream->read(fullcomponent);
out << fullcomponent << " is full" << std::endl;
}
}
return true;
}
bool getFullConnections(const utils::net::SocketData& socket_data, std::ostream &out) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
auto op = static_cast<uint8_t>(c2::Operation::describe);
io::BufferStream buffer;
buffer.write(&op, 1);
buffer.write("getfull");
if (io::isError(connection_stream->write(buffer.getBuffer()))) {
return false;
}
// read the response
uint8_t resp = 0;
connection_stream->read(resp);
if (resp == static_cast<uint8_t>(c2::Operation::describe)) {
uint16_t connections = 0;
connection_stream->read(connections);
out << connections << " are full" << std::endl;
for (int i = 0; i < connections; i++) {
std::string fullcomponent;
connection_stream->read(fullcomponent);
out << fullcomponent << " is full" << std::endl;
}
}
return true;
}
bool getConnectionSize(const utils::net::SocketData& socket_data, std::ostream &out, const std::string& connection) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
auto op = static_cast<uint8_t>(c2::Operation::describe);
io::BufferStream buffer;
buffer.write(&op, 1);
buffer.write("queue");
buffer.write(connection);
if (io::isError(connection_stream->write(buffer.getBuffer()))) {
return false;
}
// read the response
uint8_t resp = 0;
connection_stream->read(resp);
if (resp == static_cast<uint8_t>(c2::Operation::describe)) {
std::string size;
connection_stream->read(size);
out << "Size/Max of " << connection << " " << size << std::endl;
}
return true;
}
bool listComponents(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
auto op = static_cast<uint8_t>(c2::Operation::describe);
buffer.write(&op, 1);
buffer.write("components");
if (io::isError(connection_stream->write(buffer.getBuffer()))) {
return false;
}
uint16_t responses = 0;
connection_stream->read(op);
connection_stream->read(responses);
if (show_header)
out << "Components:" << std::endl;
for (int i = 0; i < responses; i++) {
std::string name;
connection_stream->read(name, false);
std::string status;
connection_stream->read(status, false);
out << name << ", running: " << status << std::endl;
}
return true;
}
bool listConnections(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
auto op = static_cast<uint8_t>(c2::Operation::describe);
buffer.write(&op, 1);
buffer.write("connections");
if (io::isError(connection_stream->write(buffer.getBuffer()))) {
return false;
}
uint16_t responses = 0;
connection_stream->read(op);
connection_stream->read(responses);
if (show_header)
out << "Connection Names:" << std::endl;
for (int i = 0; i < responses; i++) {
std::string name;
connection_stream->read(name, false);
out << name << std::endl;
}
return true;
}
bool printManifest(const utils::net::SocketData& socket_data, std::ostream &out) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
auto op = static_cast<uint8_t>(c2::Operation::describe);
buffer.write(&op, 1);
buffer.write("manifest");
if (io::isError(connection_stream->write(buffer.getBuffer()))) {
return false;
}
connection_stream->read(op);
std::string manifest;
connection_stream->read(manifest, true);
out << manifest << std::endl;
return true;
}
bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return false;
}
io::BufferStream buffer;
auto op = static_cast<uint8_t>(c2::Operation::describe);
buffer.write(&op, 1);
buffer.write("jstack");
if (io::isError(connection_stream->write(buffer.getBuffer()))) {
return false;
}
connection_stream->read(op);
std::string manifest;
connection_stream->read(manifest, true);
out << manifest << std::endl;
return true;
}
nonstd::expected<void, std::string> getDebugBundle(const utils::net::SocketData& socket_data, const std::filesystem::path& target_dir) {
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
if (connection_stream->initialize() < 0) {
return nonstd::make_unexpected("Could not connect to remote host " + socket_data.host + ":" + std::to_string(socket_data.port));
}
io::BufferStream buffer;
auto op = static_cast<uint8_t>(c2::Operation::transfer);
buffer.write(&op, 1);
buffer.write("debug");
if (io::isError(connection_stream->write(buffer.getBuffer()))) {
return nonstd::make_unexpected("Could not write to connection " + socket_data.host + ":" + std::to_string(socket_data.port));
}
connection_stream->read(op);
size_t bundle_size = 0;
connection_stream->read(bundle_size);
if (bundle_size == 0) {
return nonstd::make_unexpected("Failed to retrieve debug bundle");
}
if (std::filesystem::exists(target_dir) && !std::filesystem::is_directory(target_dir)) {
return nonstd::make_unexpected("Object specified as the target directory already exists and it is not a directory");
}
if (!std::filesystem::exists(target_dir) && utils::file::create_dir(target_dir) != 0) {
return nonstd::make_unexpected("Failed to create target directory: " + target_dir.string());
}
std::ofstream out_file(target_dir / "debug.tar.gz");
const size_t BUFFER_SIZE = 4096;
std::array<char, BUFFER_SIZE> out_buffer{};
while (bundle_size > 0) {
const auto next_read_size = (std::min)(bundle_size, BUFFER_SIZE);
const auto size_read = connection_stream->read(std::as_writable_bytes(std::span(out_buffer).subspan(0, next_read_size)));
bundle_size -= size_read;
out_file.write(out_buffer.data(), gsl::narrow<std::streamsize>(size_read));
}
return {};
}
} // namespace org::apache::nifi::minifi::controller