blob: 9aa86f23a48053c244fc4605bb9de6566246b8ef [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "KubernetesControllerService.h"
#include <vector>
extern "C" {
#include "api/CoreV1API.h"
}
#include "core/Resource.h"
#include "core/logging/LoggerFactory.h"
#include "minifi-cpp/Exception.h"
#include "minifi-cpp/utils/gsl.h"
#include "utils/StringUtils.h"
namespace org::apache::nifi::minifi::controllers {
void KubernetesControllerService::initialize() {
std::lock_guard<std::mutex> lock(initialization_mutex_);
if (initialized_) { return; }
setSupportedProperties(Properties);
initialized_ = true;
}
void KubernetesControllerService::onEnable() {
try {
api_client_ = std::make_unique<kubernetes::ApiClient>();
} catch (const std::runtime_error& ex) {
logger_->log_error("Could not create the API client in the Kubernetes Controller Service: {}", ex.what());
}
if (std::string namespace_filter = getProperty(NamespaceFilter.name).value_or(""); !namespace_filter.empty()) {
namespace_filter_ = utils::Regex{std::move(namespace_filter)};
}
if (std::string pod_name_filter = getProperty(PodNameFilter.name).value_or(""); !pod_name_filter.empty()) {
pod_name_filter_ = utils::Regex{std::move(pod_name_filter)};
}
if (std::string container_name_filter = getProperty(ContainerNameFilter.name).value_or(""); !container_name_filter.empty()) {
container_name_filter_ = utils::Regex{std::move(container_name_filter)};
}
}
namespace {
struct v1_pod_list_t_deleter {
void operator()(v1_pod_list_t* ptr) const noexcept { v1_pod_list_free(ptr); }
};
using v1_pod_list_unique_ptr = std::unique_ptr<v1_pod_list_t, v1_pod_list_t_deleter>;
v1_pod_list_unique_ptr getPods(gsl::not_null<apiClient_t*> api_client, core::logging::Logger& logger) {
logger.log_info("Calling Kubernetes API listPodForAllNamespaces...");
v1_pod_list_unique_ptr pod_list{CoreV1API_listPodForAllNamespaces(api_client,
0, // allowWatchBookmarks
nullptr, // continue
nullptr, // fieldSelector
nullptr, // labelSelector
0, // limit
nullptr, // pretty
nullptr, // resourceVersion
nullptr, // resourceVersionMatch
0, // timeoutSeconds
0)}; // watch
logger.log_info("The return code of the Kubernetes API listPodForAllNamespaces call: {}", api_client->response_code);
return pod_list;
}
} // namespace
std::optional<std::vector<KubernetesControllerService::AttributeMap>> KubernetesControllerService::getAttributes() {
if (!api_client_) {
logger_->log_warn("The Kubernetes client is not valid, unable to call the Kubernetes API");
return std::nullopt;
}
const auto pod_list = getPods(api_client_->getClient(), *logger_);
if (!pod_list) {
logger_->log_warn("Could not find any Kubernetes pods");
return std::nullopt;
}
std::vector<AttributeMap> container_attribute_maps;
listEntry_t* pod_entry = nullptr;
list_ForEach(pod_entry, pod_list->items) {
const auto pod = static_cast<v1_pod_t*>(pod_entry->data);
std::string name_space{pod->metadata->_namespace};
std::string pod_name{pod->metadata->name};
std::string uid{pod->metadata->uid};
listEntry_t* container_entry = nullptr;
list_ForEach(container_entry, pod->spec->containers) {
auto container = static_cast<v1_container_t*>(container_entry->data);
std::string container_name{container->name};
if (matchesRegexFilters(name_space, pod_name, container_name)) {
container_attribute_maps.push_back(AttributeMap{
{"namespace", name_space},
{"pod", pod_name},
{"uid", uid},
{"container", container_name}});
}
}
}
logger_->log_info("Found {} containers (after regex filtering) in {} Kubernetes pods (unfiltered)", container_attribute_maps.size(), pod_list->items->count);
return container_attribute_maps;
}
bool KubernetesControllerService::matchesRegexFilters(const kubernetes::ContainerInfo& container_info) const {
return matchesRegexFilters(container_info.name_space, container_info.pod_name, container_info.container_name);
}
bool KubernetesControllerService::matchesRegexFilters(const std::string& name_space, const std::string& pod_name, const std::string& container_name) const {
static constexpr auto matchesFilter = [](const std::string& target, const std::optional<utils::Regex>& filter) {
return !filter || utils::regexMatch(target, *filter);
};
return matchesFilter(name_space, namespace_filter_) &&
matchesFilter(pod_name, pod_name_filter_) &&
matchesFilter(container_name, container_name_filter_);
}
REGISTER_RESOURCE(KubernetesControllerService, ControllerService);
} // namespace org::apache::nifi::minifi::controllers