blob: 4f89cb6654c321da2d7fa7d24a273cce4079c959 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <chrono>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <string_view>
#include "core/AbstractProcessor.h"
#include "core/Annotation.h"
#include "core/logging/LoggerFactory.h"
#include "core/PropertyDefinitionBuilder.h"
#include "minifi-cpp/core/PropertyValidator.h"
#include "core/RelationshipDefinition.h"
#include "RollingWindow.h"
#include "core/StateManager.h"
namespace org::apache::nifi::minifi::processors {
class AttributeRollingWindow final : public core::AbstractProcessor<AttributeRollingWindow> {
public:
using core::AbstractProcessor<AttributeRollingWindow>::AbstractProcessor;
EXTENSIONAPI static constexpr auto Description = "Track a Rolling Window based on evaluating an Expression Language "
"expression on each FlowFile. Each FlowFile will be emitted with the count of FlowFiles and total aggregate value"
" of values processed in the current window.";
EXTENSIONAPI static constexpr auto ValueToTrack = core::PropertyDefinitionBuilder<>::createProperty("Value to track")
.withDescription("The expression on which to evaluate each FlowFile. The result of the expression will be added "
"to the rolling window value.")
.isRequired(true)
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto TimeWindow = core::PropertyDefinitionBuilder<>::createProperty("Time window")
.withDescription("The amount of time for a rolling window. The format of the value is expected to be a "
"count followed by a time unit. For example 5 millis, 10 secs, 1 min, 3 hours, 2 days, etc.")
.withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)
.build();
EXTENSIONAPI static constexpr auto WindowLength = core::PropertyDefinitionBuilder<>::createProperty("Window length")
.withDescription("The window length in number of values. Takes precedence over 'Time window'. If set to zero, "
"the 'Time window' property is used instead.")
.isRequired(true)
.withDefaultValue("0")
.withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)
.build();
EXTENSIONAPI static constexpr auto AttributeNamePrefix = core::PropertyDefinitionBuilder<>::createProperty("Attribute name prefix")
.withDescription("The prefix to add to the generated attribute names. For example, if this is set to 'rolling.window.', "
"then the full attribute names will be 'rolling.window.value', 'rolling.window.count', etc.")
.isRequired(true)
.withDefaultValue("rolling.window.")
.build();
EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({
ValueToTrack,
TimeWindow,
WindowLength,
AttributeNamePrefix
});
EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "All FlowFiles that are "
"successfully processed are routed to this relationship."};
EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "When a FlowFile fails, "
"it is routed here."};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure};
EXTENSIONAPI static constexpr auto Count = core::OutputAttributeDefinition<>{"<prefix>count", {Success}, "Number of the values in the rolling window"};
EXTENSIONAPI static constexpr auto Value = core::OutputAttributeDefinition<>{"<prefix>value", {Success}, "Sum of the values in the rolling window"};
EXTENSIONAPI static constexpr auto Mean = core::OutputAttributeDefinition<>{"<prefix>mean", {Success}, "Mean of the values in the rolling window"};
EXTENSIONAPI static constexpr auto Median = core::OutputAttributeDefinition<>{"<prefix>median", {Success}, "Median of the values in the rolling window"};
EXTENSIONAPI static constexpr auto Variance = core::OutputAttributeDefinition<>{"<prefix>variance", {Success}, "Variance of the values in the rolling window"};
EXTENSIONAPI static constexpr auto Stddev = core::OutputAttributeDefinition<>{"<prefix>stddev", {Success}, "Standard deviation of the values in the rolling window"};
EXTENSIONAPI static constexpr auto Min = core::OutputAttributeDefinition<>{"<prefix>min", {Success}, "Smallest value in the rolling window"};
EXTENSIONAPI static constexpr auto Max = core::OutputAttributeDefinition<>{"<prefix>max", {Success}, "Largest value in the rolling window"};
EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 8>{
Count, Value, Mean, Median, Variance, Stddev, Min, Max};
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
void onSchedule(core::ProcessContext&, core::ProcessSessionFactory&) final;
void onTrigger(core::ProcessContext&, core::ProcessSession&) final;
private:
bool runningInvariant() const {
// Either time_window_ or window_length_ must be set. If window_length_ is set, it is > 0.
return (time_window_ || window_length_) && (!window_length_ || *window_length_ > 0);
}
void calculateAndSetAttributes(core::FlowFile&, std::span<const double> sorted_values) const;
mutable std::mutex state_mutex_;
standard::utils::RollingWindow<std::chrono::time_point<std::chrono::system_clock>, double> state_;
std::optional<std::chrono::milliseconds> time_window_{};
std::optional<size_t> window_length_{};
std::string attribute_name_prefix_;
};
} // namespace org::apache::nifi::minifi::processors