blob: 5415979717dc4d3b28ffcdc0baa436e492e402d5 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
#include <deque>
#include <thread>
#include <mutex>
#include <vector>
#include <condition_variable>
#include "core/logging/LoggerConfiguration.h"
#include "utils/ByteArrayCallback.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace utils {
/**
* The original class here was deadlock-prone, undocumented and was a smorgasbord of multithreading primitives used inconsistently.
* This is a rewrite based on the contract inferred from this class's usage in utils::HTTPClient
* through HTTPStream and the non-buggy part of the behaviour of the original class.
* Based on these:
* - this class provides a mechanism through which chunks of data can be inserted on a producer thread, while a
* consumer thread simultaneously reads this stream of data in CURLOPT_READFUNCTION to supply a POST or PUT request
* body with data utilizing HTTP chunked transfer encoding
* - once a chunk of data is completely processed, we can discard it (i.e. the consumer will not seek backwards)
* - if we expect that more data will be available, but there is none available at the current time, we should block
* the consumer thread until either new data becomes available, or we are closed, signaling that there will be no
* new data
* - we signal that we have provided all data by returning a nullptr from getBuffer. After this no further calls asking
* for data should be made on us
* - we keep a current buffer and change this buffer once the consumer requests an offset which can no longer be served
* by the current buffer
* - because of this, all functions that request data at a specific offset are implicit seeks and potentially modify
* the current buffer
*/
class HttpStreamingCallback : public ByteInputCallBack {
public:
HttpStreamingCallback()
: logger_(logging::LoggerFactory<HttpStreamingCallback>::getLogger()),
is_alive_(true),
total_bytes_loaded_(0U),
current_buffer_start_(0U),
current_pos_(0U),
ptr_(nullptr) {
}
virtual ~HttpStreamingCallback() = default;
void close() {
logger_->log_trace("close() called");
std::unique_lock<std::mutex> lock(mutex_);
is_alive_ = false;
cv.notify_all();
}
void seek(size_t pos) override {
logger_->log_trace("seek(pos: %zu) called", pos);
std::unique_lock<std::mutex> lock(mutex_);
seekInner(lock, pos);
}
int64_t process(std::shared_ptr<io::BaseStream> stream) override {
std::vector<char> vec;
if (stream->size() > 0) {
vec.resize(stream->size());
stream->read(reinterpret_cast<uint8_t*>(vec.data()), gsl::narrow<int>(stream->size()));
}
return processInner(std::move(vec));
}
virtual int64_t process(const uint8_t* data, size_t size) {
std::vector<char> vec;
vec.resize(size);
memcpy(vec.data(), reinterpret_cast<const char*>(data), size);
return processInner(std::move(vec));
}
void write(std::string content) override {
std::vector<char> vec;
vec.assign(content.begin(), content.end());
(void) processInner(std::move(vec));
}
char* getBuffer(size_t pos) override {
logger_->log_trace("getBuffer(pos: %zu) called", pos);
std::unique_lock<std::mutex> lock(mutex_);
seekInner(lock, pos);
if (ptr_ == nullptr) {
return nullptr;
}
size_t relative_pos = pos - current_buffer_start_;
current_pos_ = pos;
return ptr_ + relative_pos;
}
const size_t getRemaining(size_t pos) override {
logger_->log_trace("getRemaining(pos: %zu) called", pos);
std::unique_lock<std::mutex> lock(mutex_);
seekInner(lock, pos);
return total_bytes_loaded_ - pos;
}
const size_t getBufferSize() override {
logger_->log_trace("getBufferSize() called");
std::unique_lock<std::mutex> lock(mutex_);
// This is needed to make sure that the first buffer is loaded
seekInner(lock, current_pos_);
return total_bytes_loaded_;
}
private:
/**
* Loads the next available buffer
* @param lock unique_lock which *must* own the lock
*/
inline void loadNextBuffer(std::unique_lock<std::mutex>& lock) {
cv.wait(lock, [&] {
return !byte_arrays_.empty() || !is_alive_;
});
if (byte_arrays_.empty()) {
logger_->log_trace("loadNextBuffer() ran out of buffers");
ptr_ = nullptr;
} else {
current_vec_ = std::move(byte_arrays_.front());
byte_arrays_.pop_front();
ptr_ = current_vec_.data();
current_buffer_start_ = total_bytes_loaded_;
current_pos_ = current_buffer_start_;
total_bytes_loaded_ += current_vec_.size();
logger_->log_trace("loadNextBuffer() loaded new buffer, ptr_: %p, size: %zu, current_buffer_start_: %zu, current_pos_: %zu, total_bytes_loaded_: %zu",
ptr_,
current_vec_.size(),
current_buffer_start_,
current_pos_,
total_bytes_loaded_);
}
}
/**
* Common implementation for placing a buffer into the queue
* @param vec the buffer to be inserted
* @return the number of bytes processed (the size of vec)
*/
int64_t processInner(std::vector<char>&& vec) {
size_t size = vec.size();
logger_->log_trace("processInner() called, vec.data(): %p, vec.size(): %zu", vec.data(), size);
if (size == 0U) {
return 0U;
}
std::unique_lock<std::mutex> lock(mutex_);
byte_arrays_.emplace_back(std::move(vec));
cv.notify_all();
return size;
}
/**
* Seeks to the specified position
* @param lock unique_lock which *must* own the lock
* @param pos position to seek to
*/
void seekInner(std::unique_lock<std::mutex>& lock, size_t pos) {
logger_->log_trace("seekInner() called, current_pos_: %zu, pos: %zu", current_pos_, pos);
if (pos < current_pos_) {
const std::string errstr = "Seeking backwards is not supported, tried to seek from " + std::to_string(current_pos_) + " to " + std::to_string(pos);
logger_->log_error("%s", errstr);
throw std::logic_error(errstr);
}
while ((pos - current_buffer_start_) >= current_vec_.size()) {
loadNextBuffer(lock);
if (ptr_ == nullptr) {
break;
}
}
}
std::shared_ptr<logging::Logger> logger_;
std::mutex mutex_;
std::condition_variable cv;
bool is_alive_;
size_t total_bytes_loaded_;
size_t current_buffer_start_;
size_t current_pos_;
std::deque<std::vector<char>> byte_arrays_;
std::vector<char> current_vec_;
char *ptr_;
};
} /* namespace utils */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
#endif /* EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_ */