| /** |
| * @file ListenHTTP.h |
| * ListenHTTP class declaration |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #ifndef __LISTEN_HTTP_H__ |
| #define __LISTEN_HTTP_H__ |
| |
| #include <memory> |
| #include <regex> |
| |
| #include <CivetServer.h> |
| |
| #include "FlowFileRecord.h" |
| #include "core/Processor.h" |
| #include "core/ProcessSession.h" |
| #include "core/Core.h" |
| #include "core/Resource.h" |
| #include "core/logging/LoggerConfiguration.h" |
| #include "utils/MinifiConcurrentQueue.h" |
| |
| namespace org { |
| namespace apache { |
| namespace nifi { |
| namespace minifi { |
| namespace processors { |
| |
| // ListenHTTP Class |
| class ListenHTTP : public core::Processor { |
| public: |
| using FlowFileBufferPair=std::pair<std::shared_ptr<FlowFileRecord>, std::unique_ptr<io::BufferStream>>; |
| |
| // Constructor |
| /*! |
| * Create a new processor |
| */ |
| ListenHTTP(std::string name, utils::Identifier uuid = utils::Identifier()) |
| : Processor(name, uuid), |
| logger_(logging::LoggerFactory<ListenHTTP>::getLogger()), |
| batch_size_(0) { |
| callbacks_.log_message = &logMessage; |
| callbacks_.log_access = &logAccess; |
| } |
| // Destructor |
| virtual ~ListenHTTP(); |
| // Processor Name |
| static constexpr char const *ProcessorName = "ListenHTTP"; |
| // Supported Properties |
| static core::Property BasePath; |
| static core::Property Port; |
| static core::Property AuthorizedDNPattern; |
| static core::Property SSLCertificate; |
| static core::Property SSLCertificateAuthority; |
| static core::Property SSLVerifyPeer; |
| static core::Property SSLMinimumVersion; |
| static core::Property HeadersAsAttributesRegex; |
| static core::Property BatchSize; |
| static core::Property BufferSize; |
| // Supported Relationships |
| static core::Relationship Success; |
| |
| void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override; |
| void initialize() override; |
| void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override; |
| std::string getPort() const; |
| bool isSecure() const; |
| |
| struct ResponseBody { |
| std::string uri; |
| std::string mime_type; |
| std::string body; |
| }; |
| |
| // HTTP request handler |
| class Handler : public CivetHandler { |
| public: |
| Handler(std::string base_uri, |
| core::ProcessContext *context, |
| std::string &&authDNPattern, |
| std::string &&headersAsAttributesPattern); |
| bool handlePost(CivetServer *server, struct mg_connection *conn) override; |
| bool handleGet(CivetServer *server, struct mg_connection *conn) override; |
| bool handleHead(CivetServer *server, struct mg_connection *conn) override; |
| |
| /** |
| * Sets a static response body string to be used for a given URI, with a number of seconds it will be kept in memory. |
| * @param response |
| */ |
| void setResponseBody(const ResponseBody& response); |
| |
| bool dequeueRequest(FlowFileBufferPair &flow_file_buffer_pair); |
| |
| private: |
| void sendHttp500(struct mg_connection *conn); |
| void sendHttp503(struct mg_connection *conn); |
| bool authRequest(mg_connection *conn, const mg_request_info *req_info) const; |
| void setHeaderAttributes(const mg_request_info *req_info, const std::shared_ptr<core::FlowFile> &flow_file) const; |
| void writeBody(mg_connection *conn, const mg_request_info *req_info, bool include_payload = true); |
| std::unique_ptr<io::BufferStream> createContentBuffer(struct mg_connection *conn, const struct mg_request_info *req_info); |
| void enqueueRequest(mg_connection *conn, const mg_request_info *req_info, std::unique_ptr<io::BufferStream>); |
| |
| std::string base_uri_; |
| std::regex auth_dn_regex_; |
| std::regex headers_as_attrs_regex_; |
| core::ProcessContext *process_context_; |
| std::shared_ptr<logging::Logger> logger_; |
| std::map<std::string, ResponseBody> response_uri_map_; |
| std::mutex uri_map_mutex_; |
| uint64_t buffer_size_; |
| utils::ConcurrentQueue<FlowFileBufferPair> request_buffer_; |
| }; |
| |
| class ResponseBodyReadCallback : public InputStreamCallback { |
| public: |
| explicit ResponseBodyReadCallback(std::string *out_str) |
| : out_str_(out_str) { |
| } |
| int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { |
| out_str_->resize(stream->size()); |
| uint64_t num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]), |
| gsl::narrow<int>(stream->size())); |
| |
| if (num_read != stream->size()) { |
| throw std::runtime_error("GraphReadCallback failed to fully read flow file input stream"); |
| } |
| |
| return num_read; |
| } |
| |
| private: |
| std::string *out_str_; |
| }; |
| |
| // Write callback for transferring data from HTTP request to content repo |
| class WriteCallback : public OutputStreamCallback { |
| public: |
| WriteCallback(std::unique_ptr<io::BufferStream>); |
| int64_t process(const std::shared_ptr<io::BaseStream>& stream) override; |
| |
| private: |
| std::unique_ptr<io::BufferStream> request_content_; |
| }; |
| |
| static int logMessage(const struct mg_connection *conn, const char *message) { |
| try { |
| struct mg_context* ctx = mg_get_context(conn); |
| /* CivetServer stores 'this' as the userdata when calling mg_start */ |
| CivetServer* server = static_cast<CivetServer*>(mg_get_user_data(ctx)); |
| if (server == nullptr) { |
| return 0; |
| } |
| std::shared_ptr<logging::Logger>* logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext())); |
| if (logger == nullptr) { |
| return 0; |
| } |
| logging::LOG_ERROR((*logger)) << "CivetWeb error: " << message; |
| } catch (...) { |
| } |
| return 0; |
| } |
| |
| static int logAccess(const struct mg_connection *conn, const char *message) { |
| try { |
| struct mg_context* ctx = mg_get_context(conn); |
| /* CivetServer stores 'this' as the userdata when calling mg_start */ |
| CivetServer* server = static_cast<CivetServer*>(mg_get_user_data(ctx)); |
| if (server == nullptr) { |
| return 0; |
| } |
| std::shared_ptr<logging::Logger>* logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext())); |
| if (logger == nullptr) { |
| return 0; |
| } |
| logging::LOG_DEBUG((*logger)) << "CivetWeb access: " << message; |
| } catch (...) { |
| } |
| return 0; |
| } |
| protected: |
| void notifyStop() override; |
| |
| private: |
| static const uint64_t DEFAULT_BUFFER_SIZE; |
| |
| void processIncomingFlowFile(core::ProcessSession *session); |
| void processRequestBuffer(core::ProcessSession *session); |
| |
| std::shared_ptr<logging::Logger> logger_; |
| CivetCallbacks callbacks_; |
| std::unique_ptr<CivetServer> server_; |
| std::unique_ptr<Handler> handler_; |
| std::string listeningPort; |
| uint64_t batch_size_; |
| }; |
| |
| REGISTER_RESOURCE(ListenHTTP, "Starts an HTTP Server and listens on a given base path to transform incoming requests into FlowFiles. The default URI of the Service will be " |
| "http://{hostname}:{port}/contentListener. Only HEAD, POST, and GET requests are supported. PUT, and DELETE will result in an error and the HTTP response status code 405." |
| " The response body text for all requests, by default, is empty (length of 0). A static response body can be set for a given URI by sending input files to ListenHTTP with " |
| "the http.type attribute set to response_body. The response body FlowFile filename attribute is appended to the Base Path property (separated by a /) when mapped to incoming requests. " |
| "The mime.type attribute of the response body FlowFile is used for the Content-type header in responses. Response body content can be cleared by sending an empty (size 0) " |
| "FlowFile for a given URI mapping."); |
| |
| } /* namespace processors */ |
| } /* namespace minifi */ |
| } /* namespace nifi */ |
| } /* namespace apache */ |
| } /* namespace org */ |
| |
| #endif |