blob: b00465bb3c5c7b0c29caa8917b0521d57eb5fed1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "iceberg/data/data_writer.h"
#include <map>
#include "iceberg/file_writer.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/util/macros.h"
namespace iceberg {
class DataWriter::Impl {
public:
static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
WriterOptions writer_options{
.path = options.path,
.schema = options.schema,
.io = options.io,
.properties = WriterProperties::FromMap(options.properties),
};
ICEBERG_ASSIGN_OR_RAISE(auto writer,
WriterFactoryRegistry::Open(options.format, writer_options));
return std::unique_ptr<Impl>(new Impl(std::move(options), std::move(writer)));
}
Status Write(ArrowArray* data) {
ICEBERG_DCHECK(writer_, "Writer not initialized");
return writer_->Write(data);
}
Result<int64_t> Length() const {
ICEBERG_DCHECK(writer_, "Writer not initialized");
return writer_->length();
}
Status Close() {
ICEBERG_DCHECK(writer_, "Writer not initialized");
if (closed_) {
// Idempotent: no-op if already closed
return {};
}
ICEBERG_RETURN_UNEXPECTED(writer_->Close());
closed_ = true;
return {};
}
Result<FileWriter::WriteResult> Metadata() {
ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
auto split_offsets = writer_->split_offsets();
// Serialize literal bounds to binary format
std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
for (const auto& [col_id, literal] : metrics.lower_bounds) {
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
lower_bounds_map[col_id] = std::move(serialized);
}
std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
for (const auto& [col_id, literal] : metrics.upper_bounds) {
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
upper_bounds_map[col_id] = std::move(serialized);
}
auto data_file = std::make_shared<DataFile>(DataFile{
.content = DataFile::Content::kData,
.file_path = options_.path,
.file_format = options_.format,
.partition = options_.partition,
.record_count = metrics.row_count.value_or(-1),
.file_size_in_bytes = length,
.column_sizes = {metrics.column_sizes.begin(), metrics.column_sizes.end()},
.value_counts = {metrics.value_counts.begin(), metrics.value_counts.end()},
.null_value_counts = {metrics.null_value_counts.begin(),
metrics.null_value_counts.end()},
.nan_value_counts = {metrics.nan_value_counts.begin(),
metrics.nan_value_counts.end()},
.lower_bounds = std::move(lower_bounds_map),
.upper_bounds = std::move(upper_bounds_map),
.split_offsets = std::move(split_offsets),
.sort_order_id = options_.sort_order_id,
});
FileWriter::WriteResult result;
result.data_files.push_back(std::move(data_file));
return result;
}
private:
Impl(DataWriterOptions options, std::unique_ptr<Writer> writer)
: options_(std::move(options)), writer_(std::move(writer)) {}
DataWriterOptions options_;
std::unique_ptr<Writer> writer_;
bool closed_ = false;
};
DataWriter::DataWriter(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
DataWriter::~DataWriter() = default;
Result<std::unique_ptr<DataWriter>> DataWriter::Make(const DataWriterOptions& options) {
ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
return std::unique_ptr<DataWriter>(new DataWriter(std::move(impl)));
}
Status DataWriter::Write(ArrowArray* data) { return impl_->Write(data); }
Result<int64_t> DataWriter::Length() const { return impl_->Length(); }
Status DataWriter::Close() { return impl_->Close(); }
Result<FileWriter::WriteResult> DataWriter::Metadata() { return impl_->Metadata(); }
} // namespace iceberg