blob: 3b94c4b646f21e89006ce417928d8a869ca38567 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PutKinesisStream.h"
#include <memory>
#include <random>
#include <ranges>
#include <unordered_map>
#include "aws/kinesis/KinesisClient.h"
#include "aws/kinesis/model/PutRecordsRequest.h"
#include "minifi-cpp/core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "range/v3/algorithm/for_each.hpp"
#include "range/v3/view.hpp"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::aws::processors {
void PutKinesisStream::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void PutKinesisStream::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) {
AwsProcessor::onSchedule(context, session_factory);
batch_size_ = parseU64Property(context, MessageBatchSize);
if (batch_size_ == 0 || batch_size_ > 500) {
logger_->log_warn("{} is invalid. Setting it to the maximum 500 value.", MessageBatchSize.name);
batch_size_ = 500;
}
batch_data_size_soft_cap_ = parseDataSizeProperty(context, MaxBatchDataSize);
if (batch_data_size_soft_cap_ > 4_MB) {
logger_->log_warn("{} is invalid. Setting it to the maximum 4 MB value.", MaxBatchDataSize.name);
batch_data_size_soft_cap_ = 4_MB;
}
endpoint_override_url_ = context.getProperty(EndpointOverrideURL.name) | minifi::utils::toOptional();
}
nonstd::expected<Aws::Kinesis::Model::PutRecordsRequestEntry, PutKinesisStream::BatchItemError> PutKinesisStream::createEntryFromFlowFile(const core::ProcessContext& context,
core::ProcessSession& session,
const std::shared_ptr<core::FlowFile>& flow_file) const {
Aws::Kinesis::Model::PutRecordsRequestEntry entry;
const auto partition_key = context.getProperty(AmazonKinesisStreamPartitionKey.name, flow_file.get()) | minifi::utils::valueOrElse([&flow_file] { return flow_file->getUUID().to_string(); });
entry.SetPartitionKey(partition_key);
const auto [status, buffer] = session.readBuffer(flow_file);
if (io::isError(status)) {
logger_->log_error("Couldn't read content from {}", flow_file->getUUIDStr());
return nonstd::make_unexpected(BatchItemError{.error_message = "Failed to read content", .error_code = std::nullopt});
}
Aws::Utils::ByteBuffer aws_buffer(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size());
entry.SetData(aws_buffer);
return entry;
}
std::unordered_map<std::string, PutKinesisStream::StreamBatch> PutKinesisStream::createStreamBatches(const core::ProcessContext& context, core::ProcessSession& session) const {
static constexpr uint64_t SINGLE_RECORD_MAX_SIZE = 1_MB;
std::unordered_map<std::string, StreamBatch> stream_batches;
uint64_t ff_count_in_batches = 0;
while (ff_count_in_batches < batch_size_) {
std::shared_ptr<core::FlowFile> flow_file = session.get();
if (!flow_file) { break; }
const auto flow_file_size = flow_file->getSize();
if (flow_file_size > SINGLE_RECORD_MAX_SIZE) {
flow_file->setAttribute(AwsKinesisErrorMessage.name, fmt::format("record too big {}, max allowed {}", flow_file_size, SINGLE_RECORD_MAX_SIZE));
session.transfer(flow_file, Failure);
logger_->log_error("Failed to publish to kinesis record {} because the size was greater than {} bytes", flow_file->getUUID().to_string(), SINGLE_RECORD_MAX_SIZE);
continue;
}
auto stream_name = context.getProperty(AmazonKinesisStreamName.name, flow_file.get());
if (!stream_name) {
logger_->log_error("Stream name is invalid due to {}", stream_name.error().message());
flow_file->setAttribute(AwsKinesisErrorMessage.name, fmt::format("Stream name is invalid due to {}", stream_name.error().message()));
session.transfer(flow_file, Failure);
continue;
}
auto entry = createEntryFromFlowFile(context, session, flow_file);
if (!entry) {
flow_file->addAttribute(AwsKinesisErrorMessage.name, entry.error().error_message);
if (entry.error().error_code) {
flow_file->addAttribute(AwsKinesisErrorCode.name, *entry.error().error_code);
}
session.transfer(flow_file, Failure);
continue;
}
auto [stream_batch, newly_created] = stream_batches.emplace(*stream_name, StreamBatch{});
if (newly_created) {
stream_batch->second.request.SetStreamName(*stream_name);
}
stream_batch->second.request.AddRecords(*entry);
stream_batch->second.items.push_back(BatchItem{.flow_file = std::move(flow_file), .result = BatchItemResult{}});
stream_batches[*stream_name].batch_size += flow_file_size;
++ff_count_in_batches;
if (stream_batches[*stream_name].batch_size > batch_data_size_soft_cap_) {
break;
}
}
return stream_batches;
}
void PutKinesisStream::processBatch(StreamBatch& stream_batch, const Aws::Kinesis::KinesisClient& client) const {
const auto put_record_result = client.PutRecords(stream_batch.request);
if (!put_record_result.IsSuccess()) {
ranges::for_each(stream_batch.items, [&](auto& item) {
item.result = nonstd::make_unexpected(BatchItemError{
.error_message = put_record_result.GetError().GetMessage(),
.error_code = std::to_string(static_cast<int>(put_record_result.GetError().GetResponseCode()))});
});
return;
}
const auto result_records = put_record_result.GetResult().GetRecords();
if (result_records.size() != stream_batch.items.size()) {
logger_->log_critical("PutKinesisStream record size ({}) and result size ({}) mismatch in {} cannot tell which record succeeded and which didnt",
stream_batch.items.size(), result_records.size(), stream_batch.request.GetStreamName());
ranges::for_each(stream_batch.items, [&](auto& item) {
item.result = nonstd::make_unexpected(BatchItemError{
.error_message = "Record size mismatch",
.error_code = std::nullopt});
});
return;
}
for (uint64_t i = 0; i < stream_batch.items.size(); i++) {
auto& [flow_file, result] = stream_batch.items[i];
const auto& result_record = result_records[i];
if (!result_record.GetErrorCode().empty()) {
result = nonstd::make_unexpected(BatchItemError{.error_message = result_record.GetErrorMessage(), .error_code = result_record.GetErrorCode()});
} else {
result = BatchItemResult{.sequence_number = result_record.GetSequenceNumber(), .shard_id = result_record.GetShardId()};
}
}
}
void PutKinesisStream::transferFlowFiles(core::ProcessSession& session, const StreamBatch& stream_batch) {
for (const auto& batch_item : stream_batch.items) {
if (batch_item.result) {
batch_item.flow_file->setAttribute(AwsKinesisSequenceNumber.name, batch_item.result->sequence_number);
batch_item.flow_file->setAttribute(AwsKinesisShardId.name, batch_item.result->shard_id);
session.transfer(batch_item.flow_file, Success);
} else {
batch_item.flow_file->setAttribute(AwsKinesisErrorMessage.name, batch_item.result.error().error_message);
if (batch_item.result.error().error_code) {
batch_item.flow_file->setAttribute(AwsKinesisErrorCode.name, *batch_item.result.error().error_code);
}
session.transfer(batch_item.flow_file, Failure);
}
}
}
void PutKinesisStream::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
logger_->log_trace("PutKinesisStream onTrigger");
const auto credentials = getAWSCredentials(context, nullptr);
if (!credentials) {
logger_->log_error("Failed to get credentials for PutKinesisStream");
context.yield();
return;
}
auto stream_batches = createStreamBatches(context, session);
if (stream_batches.empty()) {
context.yield();
return;
}
const auto kinesis_client = getClient(*credentials);
for (auto& stream_batch: stream_batches | std::views::values) {
processBatch(stream_batch, *kinesis_client);
transferFlowFiles(session, stream_batch);
}
}
std::unique_ptr<Aws::Kinesis::KinesisClient> PutKinesisStream::getClient(const Aws::Auth::AWSCredentials& credentials) {
gsl_Expects(client_config_);
auto client = std::make_unique<Aws::Kinesis::KinesisClient>(credentials, *client_config_);
if (endpoint_override_url_) {
client->OverrideEndpoint(*endpoint_override_url_);
}
return client;
}
REGISTER_RESOURCE(PutKinesisStream, Processor);
} // namespace org::apache::nifi::minifi::aws::processors