blob: cf4c7f6be8a40a773b60170fbb8401899e6b3bfb [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_
#define LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_
#include "core/Core.h"
#include "Connection.h"
#include "RemoteProcessorGroupPort.h"
#include "core/controller/ControllerServiceNode.h"
#include "core/controller/StandardControllerServiceProvider.h"
#include "provenance/Provenance.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
#include "core/Processor.h"
#include "core/Resource.h"
#include "core/logging/LoggerConfiguration.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/ProcessGroup.h"
#include "io/StreamFactory.h"
#include "core/state/nodes/FlowInformation.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
/**
* Purpose: Flow configuration defines the mechanism
* by which we will configure our flow controller
*/
class FlowConfiguration : public CoreComponent {
public:
/**
* Constructor that will be used for configuring
* the flow controller.
*/
explicit FlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo,
std::shared_ptr<io::StreamFactory> stream_factory, std::shared_ptr<Configure> configuration, const std::string path)
: CoreComponent(core::getClassName<FlowConfiguration>()),
config_path_(path),
flow_file_repo_(flow_file_repo),
content_repo_(content_repo),
stream_factory_(stream_factory),
configuration_(configuration),
logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) {
controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration);
flow_version_ = std::make_shared<state::response::FlowVersion>("", "default", "");
// it is okay if this has already been called
initialize_static_functions();
}
virtual ~FlowConfiguration();
// Create Processor (Node/Input/Output Port) based on the name
std::shared_ptr<core::Processor> createProcessor(std::string name, uuid_t uuid);
// Create Root Processor Group
std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, uuid_t uuid, int version);
std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid);
// Create Remote Processor Group
std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(std::string name, uuid_t uuid);
// Create Connection
std::shared_ptr<minifi::Connection> createConnection(std::string name, uuid_t uuid);
// Create Provenance Report Task
std::shared_ptr<core::Processor> createProvenanceReportTask(void);
std::shared_ptr<state::response::FlowVersion> getFlowVersion() const{
return flow_version_;
}
std::shared_ptr<Configure> getConfiguration() { // cannot be const as getters mutate the underlying map
return configuration_;
}
/**
* Returns the configuration path string
* @return config_path_
*/
const std::string &getConfigurationPath() {
return config_path_;
}
virtual std::unique_ptr<core::ProcessGroup> getRoot() {
return getRoot(config_path_);
}
std::unique_ptr<core::ProcessGroup> updateFromPayload(const std::string &source, const std::string &yamlConfigPayload);
virtual std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &yamlConfigPayload) {
return nullptr;
}
/**
* Base implementation that returns a null root pointer.
* @return Extensions should return a non-null pointer in order to
* properly configure flow controller.
*/
virtual std::unique_ptr<core::ProcessGroup> getRoot(const std::string &from_config) {
return nullptr;
}
std::shared_ptr<core::controller::StandardControllerServiceProvider> &getControllerServiceProvider() {
return service_provider_;
}
static bool add_static_func(std::string functor) {
statics_sl_funcs_.push_back(functor);
return true;
}
static void initialize_static_functions() {
std::lock_guard<std::mutex> lock(atomic_initialization_);
for (auto sl_func : statics_sl_funcs_) {
core::ClassLoader::getDefaultClassLoader().registerResource("", sl_func);
}
statics_sl_funcs_.clear();
}
protected:
void registerResource(const std::string &resource_function) {
core::ClassLoader::getDefaultClassLoader().registerResource("", resource_function);
}
void registerResource(const std::string &resource_location, const std::string &resource_function) {
core::ClassLoader::getDefaultClassLoader().registerResource(resource_location, resource_function);
}
// service provider reference.
std::shared_ptr<core::controller::StandardControllerServiceProvider> service_provider_;
// based, shared controller service map.
std::shared_ptr<core::controller::ControllerServiceMap> controller_services_;
// configuration path
std::string config_path_;
// flow file repo
std::shared_ptr<core::Repository> flow_file_repo_;
// content repository.
std::shared_ptr<core::ContentRepository> content_repo_;
// stream factory
std::shared_ptr<io::StreamFactory> stream_factory_;
std::shared_ptr<Configure> configuration_;
std::shared_ptr<state::response::FlowVersion> flow_version_;
private:
std::shared_ptr<logging::Logger> logger_;
static std::mutex atomic_initialization_;
static std::vector<std::string> statics_sl_funcs_;
};
} /* namespace core */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
#endif /* LIBMINIFI_INCLUDE_CORE_FLOWCONFIGURATION_H_ */