blob: cdffa5fb55789c1409fb3c728b675471dacb006e [file] [log] [blame]
/**
* @file ListenHTTP.h
* ListenHTTP class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __LISTEN_HTTP_H__
#define __LISTEN_HTTP_H__
#include <memory>
#include <regex>
#include <CivetServer.h>
#include <concurrentqueue.h>
#include "FlowFileRecord.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "core/Core.h"
#include "core/Resource.h"
#include "core/logging/LoggerConfiguration.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
// ListenHTTP Class
class ListenHTTP : public core::Processor {
public:
// Constructor
/*!
* Create a new processor
*/
ListenHTTP(std::string name, utils::Identifier uuid = utils::Identifier())
: Processor(name, uuid),
logger_(logging::LoggerFactory<ListenHTTP>::getLogger()) {
callbacks_.log_message = &log_message;
callbacks_.log_access = &log_access;
}
// Destructor
virtual ~ListenHTTP();
// Processor Name
static constexpr char const *ProcessorName = "ListenHTTP";
// Supported Properties
static core::Property BasePath;
static core::Property Port;
static core::Property AuthorizedDNPattern;
static core::Property SSLCertificate;
static core::Property SSLCertificateAuthority;
static core::Property SSLVerifyPeer;
static core::Property SSLMinimumVersion;
static core::Property HeadersAsAttributesRegex;
// Supported Relationships
static core::Relationship Success;
void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
void initialize() override;
void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
std::string getPort() const;
bool isSecure() const;
struct response_body {
std::string uri;
std::string mime_type;
std::string body;
};
// HTTP request handler
class Handler : public CivetHandler {
public:
Handler(std::string base_uri,
core::ProcessContext *context,
core::ProcessSessionFactory *sessionFactory,
std::string &&authDNPattern,
std::string &&headersAsAttributesPattern);
bool handlePost(CivetServer *server, struct mg_connection *conn);
bool handleGet(CivetServer *server, struct mg_connection *conn);
bool handleHead(CivetServer *server, struct mg_connection *conn);
/**
* Sets a static response body string to be used for a given URI, with a number of seconds it will be kept in memory.
* @param response
*/
void set_response_body(struct response_body response) {
std::lock_guard<std::mutex> guard(uri_map_mutex_);
if (response.body.empty()) {
logger_->log_info("Unregistering response body for URI '%s'",
response.uri);
response_uri_map_.erase(response.uri);
} else {
logger_->log_info("Registering response body for URI '%s' of length %lu",
response.uri,
response.body.size());
response_uri_map_[response.uri] = std::move(response);
}
}
private:
// Send HTTP 500 error response to client
void send_error_response(struct mg_connection *conn);
bool auth_request(mg_connection *conn, const mg_request_info *req_info) const;
void set_header_attributes(const mg_request_info *req_info, const std::shared_ptr<FlowFileRecord> &flow_file) const;
void write_body(mg_connection *conn, const mg_request_info *req_info, bool include_payload = true);
std::string base_uri_;
std::regex auth_dn_regex_;
std::regex headers_as_attrs_regex_;
core::ProcessContext *process_context_;
core::ProcessSessionFactory *session_factory_;
// Logger
std::shared_ptr<logging::Logger> logger_;
std::map<std::string, response_body> response_uri_map_;
std::mutex uri_map_mutex_;
};
class ResponseBodyReadCallback : public InputStreamCallback {
public:
explicit ResponseBodyReadCallback(std::string *out_str)
: out_str_(out_str) {
}
int64_t process(std::shared_ptr<io::BaseStream> stream) {
out_str_->resize(stream->size());
uint64_t num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]),
gsl::narrow<int>(stream->size()));
if (num_read != stream->size()) {
throw std::runtime_error("GraphReadCallback failed to fully read flow file input stream");
}
return num_read;
}
private:
std::string *out_str_;
};
// Write callback for transferring data from HTTP request to content repo
class WriteCallback : public OutputStreamCallback {
public:
WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo);
int64_t process(std::shared_ptr<io::BaseStream> stream);
private:
// Logger
std::shared_ptr<logging::Logger> logger_;
struct mg_connection *conn_;
const struct mg_request_info *req_info_;
};
static int log_message(const struct mg_connection *conn, const char *message) {
try {
struct mg_context* ctx = mg_get_context(conn);
/* CivetServer stores 'this' as the userdata when calling mg_start */
CivetServer* server = static_cast<CivetServer*>(mg_get_user_data(ctx));
if (server == nullptr) {
return 0;
}
std::shared_ptr<logging::Logger>* logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext()));
if (logger == nullptr) {
return 0;
}
logging::LOG_ERROR((*logger)) << "CivetWeb error: " << message;
} catch (...) {
}
return 0;
}
static int log_access(const struct mg_connection *conn, const char *message) {
try {
struct mg_context* ctx = mg_get_context(conn);
/* CivetServer stores 'this' as the userdata when calling mg_start */
CivetServer* server = static_cast<CivetServer*>(mg_get_user_data(ctx));
if (server == nullptr) {
return 0;
}
std::shared_ptr<logging::Logger>* logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext()));
if (logger == nullptr) {
return 0;
}
logging::LOG_DEBUG((*logger)) << "CivetWeb access: " << message;
} catch (...) {
}
return 0;
}
protected:
void notifyStop() override;
private:
// Logger
std::shared_ptr<logging::Logger> logger_;
CivetCallbacks callbacks_;
std::unique_ptr<CivetServer> server_;
std::unique_ptr<Handler> handler_;
std::string listeningPort;
};
REGISTER_RESOURCE(ListenHTTP, "Starts an HTTP Server and listens on a given base path to transform incoming requests into FlowFiles. The default URI of the Service will be "
"http://{hostname}:{port}/contentListener. Only HEAD, POST, and GET requests are supported. PUT, and DELETE will result in an error and the HTTP response status code 405."
" The response body text for all requests, by default, is empty (length of 0). A static response body can be set for a given URI by sending input files to ListenHTTP with "
"the http.type attribute set to response_body. The response body FlowFile filename attribute is appended to the Base Path property (separated by a /) when mapped to incoming requests. "
"The mime.type attribute of the response body FlowFile is used for the Content-type header in responses. Response body content can be cleared by sending an empty (size 0) "
"FlowFile for a given URI mapping.");
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
#endif