blob: 566a4c862ad3dac639ca0367cd44ec86a21f1fbd [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <utility>
#include "io/OutputStream.h"
#include "io/ArchiveStream.h"
#include "core/logging/LoggerConfiguration.h"
#include "archive_entry.h"
#include "archive.h"
namespace org::apache::nifi::minifi::io {
class ReadArchiveStreamImpl : public ReadArchiveStream {
struct archive_read_deleter {
int operator()(struct archive* ptr) const {
return archive_read_free(ptr);
}
};
using archive_ptr = std::unique_ptr<struct archive, archive_read_deleter>;
class BufferedReader {
public:
explicit BufferedReader(std::shared_ptr<InputStream> input) : input_(std::move(input)) {}
std::optional<gsl::span<const std::byte>> readChunk() {
size_t result = input_->read(buffer_);
if (io::isError(result)) {
return std::nullopt;
}
return gsl::make_span(buffer_).subspan(0, result);
}
private:
std::shared_ptr<InputStream> input_;
std::array<std::byte, 4096> buffer_{};
};
archive_ptr createReadArchive();
public:
explicit ReadArchiveStreamImpl(std::shared_ptr<InputStream> input) : reader_(std::move(input)) {
arch_ = createReadArchive();
}
std::optional<EntryInfo> nextEntry() override;
using InputStream::read;
size_t read(gsl::span<std::byte> out_buffer) override;
private:
static la_ssize_t archive_read(struct archive* archive, void *context, const void **buff) {
auto* const input = reinterpret_cast<BufferedReader*>(context);
auto opt_buffer = input->readChunk();
if (!opt_buffer) {
archive_set_error(archive, EIO, "Error reading archive");
return -1;
}
*buff = opt_buffer->data();
return gsl::narrow<la_ssize_t>(opt_buffer->size());
}
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ReadArchiveStream>::getLogger();
BufferedReader reader_;
archive_ptr arch_;
std::optional<size_t> entry_size_;
};
} // namespace org::apache::nifi::minifi::io