blob: 8ba3e0db9d856ebaf851c7afb4bd70a65925d180 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "PushGrafanaLoki.h"
#include "core/logging/LoggerFactory.h"
#include "grafana-loki-push.grpc.pb.h"
#include "grafana-loki-push.pb.h"
#include "grpc/grpc.h"
namespace org::apache::nifi::minifi::extensions::grafana::loki {
class PushGrafanaLokiGrpc final : public PushGrafanaLoki {
public:
EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push processor that uses the Grafana Loki Grpc port. The processor expects each flow file to contain a single log line to be "
"pushed to Grafana Loki, therefore it is usually used together with the TailFile processor.";
using PushGrafanaLoki::PushGrafanaLoki;
~PushGrafanaLokiGrpc() override = default;
EXTENSIONAPI static constexpr auto KeepAliveTime = core::PropertyDefinitionBuilder<>::createProperty("Keep Alive Time")
.withDescription("The period after which a keepalive ping is sent on the transport. If not set, then the keep alive is disabled.")
.withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)
.build();
EXTENSIONAPI static constexpr auto KeepAliveTimeout = core::PropertyDefinitionBuilder<>::createProperty("Keep Alive Timeout")
.withDescription("The amount of time the sender of the keepalive ping waits for an acknowledgement. If it does not receive an acknowledgment within this time, "
"it will close the connection. If not set, then the default value 20 seconds is used.")
.withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)
.build();
EXTENSIONAPI static constexpr auto MaxPingsWithoutData = core::PropertyDefinitionBuilder<>::createProperty("Max Pings Without Data")
.withDescription("The maximum number of pings that can be sent when there is no data/header frame to be sent. gRPC Core will not continue sending pings "
"if we run over the limit. Setting it to 0 allows sending pings without such a restriction. If not set, then the default value 2 is used.")
.withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)
.build();
EXTENSIONAPI static constexpr auto Properties = utils::array_cat(PushGrafanaLoki::Properties, std::to_array<core::PropertyReference>({
KeepAliveTime,
KeepAliveTimeout,
MaxPingsWithoutData,
}));
void initialize() override;
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
protected:
void setUpStreamLabels(core::ProcessContext& context) override;
std::expected<void, std::string> submitRequest(const std::vector<std::shared_ptr<core::FlowFile>>& batched_flow_files, core::ProcessSession& session) override;
void setUpGrpcChannel(const std::string& url, core::ProcessContext& context);
std::string stream_labels_;
std::chrono::milliseconds connection_timeout_ms_{};
std::optional<std::string> tenant_id_;
std::shared_ptr<::grpc::Channel> push_channel_;
std::unique_ptr<logproto::Pusher::Stub> push_stub_;
};
} // namespace org::apache::nifi::minifi::extensions::grafana::loki