blob: 1382e6a9cac421bf279e4794dd36cd7ab60ca9a7 [file] [log] [blame]
/*
* Copyright 2024-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "paimon/common/io/offset_input_stream.h"
#include <utility>
#include "fmt/format.h"
#include "paimon/macros.h"
namespace paimon {
Result<std::unique_ptr<OffsetInputStream>> OffsetInputStream::Create(
const std::shared_ptr<InputStream>& wrapped, int64_t length, int64_t offset) {
if (PAIMON_UNLIKELY(wrapped == nullptr)) {
return Status::Invalid("input stream is null pointer");
}
if (PAIMON_UNLIKELY(offset < 0)) {
return Status::Invalid(fmt::format("offset {} is less than 0", offset));
}
if (PAIMON_UNLIKELY(length < -1)) {
return Status::Invalid(fmt::format("length {} is less than -1", length));
}
PAIMON_ASSIGN_OR_RAISE(uint64_t total_length, wrapped->Length());
if (PAIMON_UNLIKELY((uint64_t)offset > total_length)) {
return Status::Invalid(
fmt::format("offset {} exceed total length {}", offset, total_length));
}
if (length == -1) {
// length == -1 means it's dynamic length, should read to the end
length = total_length - offset;
}
if (PAIMON_UNLIKELY((uint64_t)offset + (uint64_t)length > total_length)) {
return Status::Invalid(fmt::format("offset {} + length {} exceed total length {}", offset,
length, total_length));
}
PAIMON_RETURN_NOT_OK(wrapped->Seek(offset, SeekOrigin::FS_SEEK_SET));
return std::unique_ptr<OffsetInputStream>(
new OffsetInputStream(std::move(wrapped), length, offset));
}
OffsetInputStream::OffsetInputStream(const std::shared_ptr<InputStream>& wrapped, int64_t length,
int64_t offset)
: wrapped_(wrapped), length_(length), offset_(offset) {}
Status OffsetInputStream::Seek(int64_t offset, SeekOrigin origin) {
switch (origin) {
case SeekOrigin::FS_SEEK_SET: {
inner_position_ = offset;
PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_));
return wrapped_->Seek(offset_ + inner_position_, SeekOrigin::FS_SEEK_SET);
}
case SeekOrigin::FS_SEEK_CUR: {
inner_position_ += offset;
PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_));
return wrapped_->Seek(offset, SeekOrigin::FS_SEEK_CUR);
}
case SeekOrigin::FS_SEEK_END: {
inner_position_ = length_ + offset;
PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_));
return wrapped_->Seek(offset_ + inner_position_, SeekOrigin::FS_SEEK_SET);
}
default:
return Status::Invalid(
"invalid SeekOrigin, only support FS_SEEK_SET, FS_SEEK_CUR, and FS_SEEK_END");
}
return Status::OK();
}
Result<int32_t> OffsetInputStream::Read(char* buffer, uint32_t size) {
PAIMON_RETURN_NOT_OK(AssertBoundary(inner_position_ + size));
inner_position_ += size;
return wrapped_->Read(buffer, size);
}
Result<int32_t> OffsetInputStream::Read(char* buffer, uint32_t size, uint64_t offset) {
PAIMON_RETURN_NOT_OK(AssertBoundary(offset));
PAIMON_RETURN_NOT_OK(AssertBoundary(offset + size));
return wrapped_->Read(buffer, size, offset_ + offset);
}
void OffsetInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t offset,
std::function<void(Status)>&& callback) {
auto status = AssertBoundary(offset);
if (!status.ok()) {
callback(status);
return;
}
status = AssertBoundary(offset + size);
if (!status.ok()) {
callback(status);
return;
}
wrapped_->ReadAsync(buffer, size, offset_ + offset, std::move(callback));
}
Status OffsetInputStream::Close() {
return wrapped_->Close();
}
Result<std::string> OffsetInputStream::GetUri() const {
return wrapped_->GetUri();
}
Result<int64_t> OffsetInputStream::GetPos() const {
return inner_position_;
}
Result<uint64_t> OffsetInputStream::Length() const {
return length_;
}
Status OffsetInputStream::AssertBoundary(int32_t inner_pos) const {
if (inner_pos < 0 || inner_pos > length_) {
return Status::Invalid(
fmt::format("OffsetInputStream assert boundary failed: inner pos {} exceed length {}",
inner_pos, length_));
}
return Status::OK();
}
} // namespace paimon