blob: 7069854a58e47ae030afec24538b77129aabee12 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <string_view>
#include <unordered_set>
#include <unordered_map>
#include <utility>
#include <vector>
#include "core/ConfigurableComponentImpl.h"
#include "core/Connectable.h"
#include "minifi-cpp/core/Property.h"
#include "core/Core.h"
#include "minifi-cpp/core/Annotation.h"
#include "minifi-cpp/core/DynamicProperty.h"
#include "minifi-cpp/core/Scheduling.h"
#include "minifi-cpp/core/state/nodes/MetricsBase.h"
#include "core/ProcessorMetrics.h"
#include "minifi-cpp/utils/gsl.h"
#include "utils/Id.h"
#include "minifi-cpp/core/OutputAttributeDefinition.h"
#include "Processor.h"
#include "minifi-cpp/core/ProcessorApi.h"
namespace org::apache::nifi::minifi {
class Connection;
namespace core {
class ProcessContext;
class ProcessSession;
class ProcessSessionFactory;
class Processor : public ConnectableImpl, public ConfigurableComponentImpl, public state::response::ResponseNodeSource {
public:
Processor(std::string_view name, const utils::Identifier& uuid, std::unique_ptr<ProcessorApi> impl);
explicit Processor(std::string_view name, std::unique_ptr<ProcessorApi> impl);
Processor(const Processor& parent) = delete;
Processor& operator=(const Processor& parent) = delete;
bool isRunning() const override;
~Processor() override;
void setScheduledState(ScheduledState state);
ScheduledState getScheduledState() const;
void setSchedulingStrategy(SchedulingStrategy strategy);
SchedulingStrategy getSchedulingStrategy() const;
void setSchedulingPeriod(std::chrono::steady_clock::duration period);
std::chrono::steady_clock::duration getSchedulingPeriod() const;
void setCronPeriod(const std::string &period);
std::string getCronPeriod() const;
void setRunDurationNano(std::chrono::steady_clock::duration period);
std::chrono::steady_clock::duration getRunDurationNano() const;
void setYieldPeriodMsec(std::chrono::milliseconds period);
std::chrono::steady_clock::duration getYieldPeriod() const;
void setPenalizationPeriod(std::chrono::milliseconds period);
void setMaxConcurrentTasks(uint8_t tasks) override;
bool isSingleThreaded() const;
std::string getProcessorType() const;
bool getTriggerWhenEmpty() const;
uint8_t getActiveTasks() const;
void incrementActiveTasks();
void decrementActiveTask();
void clearActiveTask();
std::string getProcessGroupUUIDStr() const;
void setProcessGroupUUIDStr(const std::string &uuid);
void yield() override;
void yield(std::chrono::steady_clock::duration delta_time);
bool isYield() const;
void clearYield();
std::chrono::steady_clock::time_point getYieldExpirationTime() const;
std::chrono::steady_clock::duration getYieldTime() const;
bool addConnection(Connectable* connection);
bool canEdit() override;
void initialize() override;
void triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory);
void trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session);
void onTrigger(ProcessContext& context, ProcessSession& session);
void onSchedule(ProcessContext& context, ProcessSessionFactory& session_factory);
void onUnSchedule();
bool isWorkAvailable() override;
bool isThrottledByBackpressure() const;
Connectable* pickIncomingConnection() override;
void validateAnnotations() const;
annotation::Input getInputRequirement() const;
[[nodiscard]] bool supportsDynamicProperties() const override;
[[nodiscard]] bool supportsDynamicRelationships() const override;
state::response::SharedResponseNode getResponseNode() override;
gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const;
std::shared_ptr<ProcessorMetricsExtension> getMetricsExtension() const;
std::string getProcessGroupName() const;
void setProcessGroupName(const std::string &name);
std::string getProcessGroupPath() const;
void setProcessGroupPath(const std::string &path);
logging::LOG_LEVEL getLogBulletinLevel() const;
void setLogBulletinLevel(logging::LOG_LEVEL level);
void setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& callback);
void restore(const std::shared_ptr<FlowFile>& file) override;
static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{};
static constexpr auto OutputAttributes = std::array<OutputAttributeReference, 0>{};
ProcessorApi& getImpl() const {
gsl_Assert(impl_);
return *impl_;
}
template<typename T>
T& getImpl() const {
auto* res = dynamic_cast<T*>(&getImpl());
gsl_Assert(res);
return *res;
}
protected:
std::atomic<ScheduledState> state_;
std::atomic<std::chrono::steady_clock::duration> scheduling_period_;
std::atomic<std::chrono::steady_clock::duration> run_duration_;
std::atomic<std::chrono::steady_clock::duration> yield_period_;
std::atomic<uint8_t> active_tasks_;
std::string cron_period_;
std::shared_ptr<logging::Logger> logger_;
logging::LOG_LEVEL log_bulletin_level_ = logging::LOG_LEVEL::warn;
private:
mutable std::mutex mutex_;
std::atomic<std::chrono::steady_clock::time_point> yield_expiration_{};
// must hold the graphMutex
void updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force = false);
const std::unordered_map<Connection*, std::unordered_set<Processor*>>& reachable_processors() const;
static bool partOfCycle(Connection* conn);
// an outgoing connection allows us to reach these nodes
std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_;
std::string process_group_uuid_;
std::string process_group_name_;
std::string process_group_path_;
gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_;
protected:
std::unique_ptr<ProcessorApi> impl_;
};
} // namespace core
} // namespace org::apache::nifi::minifi