| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <array> |
| #include <chrono> |
| #include <cstdint> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| #include <string_view> |
| #include <vector> |
| |
| #include "arrow/util/span.h" |
| |
| #include "parquet/column_writer.h" |
| #include "parquet/file_writer.h" |
| |
| namespace parquet { |
| |
| /// \brief A class for writing Parquet files using an output stream type API. |
| /// |
| /// The values given must be of the correct type i.e. the type must |
| /// match the file schema exactly otherwise a ParquetException will be |
| /// thrown. |
| /// |
| /// The user must explicitly indicate the end of the row using the |
| /// EndRow() function or EndRow output manipulator. |
| /// |
| /// A maximum row group size can be configured, the default size is |
| /// 512MB. Alternatively the row group size can be set to zero and the |
| /// user can create new row groups by calling the EndRowGroup() |
| /// function or using the EndRowGroup output manipulator. |
| /// |
| /// Required and optional fields are supported: |
| /// - Required fields are written using operator<<(T) |
| /// - Optional fields are written using |
| /// operator<<(std::optional<T>). |
| /// |
| /// Note that operator<<(T) can be used to write optional fields. |
| /// |
| /// Similarly, operator<<(std::optional<T>) can be used to |
| /// write required fields. However if the optional parameter does not |
| /// have a value (i.e. it is nullopt) then a ParquetException will be |
| /// raised. |
| /// |
| /// Currently there is no support for repeated fields. |
| /// |
| class PARQUET_EXPORT StreamWriter { |
| public: |
| template <typename T> |
| using optional = ::std::optional<T>; |
| |
| // N.B. Default constructed objects are not usable. This |
| // constructor is provided so that the object may be move |
| // assigned afterwards. |
| StreamWriter() = default; |
| |
| explicit StreamWriter(std::unique_ptr<ParquetFileWriter> writer); |
| |
| ~StreamWriter() = default; |
| |
| static void SetDefaultMaxRowGroupSize(int64_t max_size); |
| |
| void SetMaxRowGroupSize(int64_t max_size); |
| |
| int current_column() const { return column_index_; } |
| |
| int64_t current_row() const { return current_row_; } |
| |
| int num_columns() const; |
| |
| // Moving is possible. |
| StreamWriter(StreamWriter&&) = default; |
| StreamWriter& operator=(StreamWriter&&) = default; |
| |
| // Copying is not allowed. |
| StreamWriter(const StreamWriter&) = delete; |
| StreamWriter& operator=(const StreamWriter&) = delete; |
| |
| /// \brief Output operators for required fields. |
| /// These can also be used for optional fields when a value must be set. |
| StreamWriter& operator<<(bool v); |
| |
| StreamWriter& operator<<(int8_t v); |
| |
| StreamWriter& operator<<(uint8_t v); |
| |
| StreamWriter& operator<<(int16_t v); |
| |
| StreamWriter& operator<<(uint16_t v); |
| |
| StreamWriter& operator<<(int32_t v); |
| |
| StreamWriter& operator<<(uint32_t v); |
| |
| StreamWriter& operator<<(int64_t v); |
| |
| StreamWriter& operator<<(uint64_t v); |
| |
| StreamWriter& operator<<(const std::chrono::milliseconds& v); |
| |
| StreamWriter& operator<<(const std::chrono::microseconds& v); |
| |
| StreamWriter& operator<<(float v); |
| |
| StreamWriter& operator<<(double v); |
| |
| StreamWriter& operator<<(char v); |
| |
| /// \brief Helper class to write fixed length strings. |
| /// This is useful as the standard string view (such as |
| /// std::string_view) is for variable length data. |
| struct PARQUET_EXPORT FixedStringView { |
| FixedStringView() = default; |
| |
| explicit FixedStringView(const char* data_ptr); |
| |
| FixedStringView(const char* data_ptr, std::size_t data_len); |
| |
| const char* data{NULLPTR}; |
| std::size_t size{0}; |
| }; |
| |
| /// \brief Output operators for fixed length strings. |
| template <int N> |
| StreamWriter& operator<<(const char (&v)[N]) { |
| return WriteFixedLength(v, N); |
| } |
| template <std::size_t N> |
| StreamWriter& operator<<(const std::array<char, N>& v) { |
| return WriteFixedLength(v.data(), N); |
| } |
| StreamWriter& operator<<(FixedStringView v); |
| |
| /// \brief Output operators for variable length strings. |
| StreamWriter& operator<<(const char* v); |
| StreamWriter& operator<<(const std::string& v); |
| StreamWriter& operator<<(::std::string_view v); |
| |
| /// \brief Helper class to write variable length raw data. |
| using RawDataView = ::arrow::util::span<const uint8_t>; |
| |
| /// \brief Output operators for variable length raw data. |
| StreamWriter& operator<<(RawDataView v); |
| |
| /// \brief Output operator for optional fields. |
| template <typename T> |
| StreamWriter& operator<<(const optional<T>& v) { |
| if (v) { |
| return operator<<(*v); |
| } |
| SkipOptionalColumn(); |
| return *this; |
| } |
| |
| /// \brief Skip the next N columns of optional data. If there are |
| /// less than N columns remaining then the excess columns are |
| /// ignored. |
| /// \throws ParquetException if there is an attempt to skip any |
| /// required column. |
| /// \return Number of columns actually skipped. |
| int64_t SkipColumns(int num_columns_to_skip); |
| |
| /// \brief Terminate the current row and advance to next one. |
| /// \throws ParquetException if all columns in the row were not |
| /// written or skipped. |
| void EndRow(); |
| |
| /// \brief Terminate the current row group and create new one. |
| void EndRowGroup(); |
| |
| protected: |
| template <typename WriterType, typename T> |
| StreamWriter& Write(const T v) { |
| auto writer = static_cast<WriterType*>(row_group_writer_->column(column_index_++)); |
| |
| writer->WriteBatch(kBatchSizeOne, &kDefLevelOne, &kRepLevelZero, &v); |
| |
| if (max_row_group_size_ > 0) { |
| row_group_size_ += writer->estimated_buffered_value_bytes(); |
| } |
| return *this; |
| } |
| |
| StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len, |
| ConvertedType::type converted_type); |
| |
| StreamWriter& WriteFixedLength(const char* data_ptr, std::size_t data_len); |
| |
| void CheckColumn(Type::type physical_type, ConvertedType::type converted_type, |
| int length = -1); |
| |
| /// \brief Skip the next column which must be optional. |
| /// \throws ParquetException if the next column does not exist or is |
| /// not optional. |
| void SkipOptionalColumn(); |
| |
| void WriteNullValue(ColumnWriter* writer); |
| |
| private: |
| using node_ptr_type = std::shared_ptr<schema::PrimitiveNode>; |
| |
| struct null_deleter { |
| void operator()(void*) {} |
| }; |
| |
| int32_t column_index_{0}; |
| int64_t current_row_{0}; |
| int64_t row_group_size_{0}; |
| int64_t max_row_group_size_{default_row_group_size_}; |
| |
| std::unique_ptr<ParquetFileWriter> file_writer_; |
| std::unique_ptr<RowGroupWriter, null_deleter> row_group_writer_; |
| std::vector<node_ptr_type> nodes_; |
| |
| static constexpr int16_t kDefLevelZero = 0; |
| static constexpr int16_t kDefLevelOne = 1; |
| static constexpr int16_t kRepLevelZero = 0; |
| static constexpr int64_t kBatchSizeOne = 1; |
| |
| static int64_t default_row_group_size_; |
| }; |
| |
| struct PARQUET_EXPORT EndRowType {}; |
| constexpr EndRowType EndRow = {}; |
| |
| struct PARQUET_EXPORT EndRowGroupType {}; |
| constexpr EndRowGroupType EndRowGroup = {}; |
| |
| PARQUET_EXPORT |
| StreamWriter& operator<<(StreamWriter&, EndRowType); |
| |
| PARQUET_EXPORT |
| StreamWriter& operator<<(StreamWriter&, EndRowGroupType); |
| |
| } // namespace parquet |