blob: ae4d565899ea437ce4f00ce199b407f4cd671287 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <vector>
#include "common/hdfs.h"
#include "common/status.h"
#include "gen-cpp/control_service.pb.h"
namespace impala {
class HdfsPartitionDescriptor;
class HdfsTableDescriptor;
class TableSinkBase;
struct OutputPartition;
class RowBatch;
class RuntimeState;
class ScalarExprEvaluator;
/// Per column statistics to be written to Iceberg manifest files (for each data file).
/// min_binary and max_binary members contain Single-Value serialized lower and upper
/// column stats
/// (
struct IcebergColumnStats {
bool has_min_max_values;
std::string min_binary;
std::string max_binary;
int64_t value_count;
int64_t null_count;
int64_t column_size;
typedef std::unordered_map<int, IcebergColumnStats> IcebergFileStats;
/// Pure virtual class for writing to hdfs table partition files.
/// Subclasses implement the code needed to write to a specific file type.
/// A subclass needs to implement functions to format and add rows to the file
/// and to do whatever processing is needed prior to closing the file.
class HdfsTableWriter {
/// The implementation of a writer may reference the parameters to the constructor
/// during the lifetime of the object.
/// output_partition -- Information on the output partition file.
/// partition -- the descriptor for the partition being written
/// table_desc -- the descriptor for the table being written.
HdfsTableWriter(TableSinkBase* parent,
RuntimeState* state, OutputPartition* output_partition,
const HdfsPartitionDescriptor* partition_desc,
const HdfsTableDescriptor* table_desc);
virtual ~HdfsTableWriter() { }
/// The sequence of calls to this object are:
/// 1. Init()
/// 2. InitNewFile()
/// 3. AppendRows() - called repeatedly
/// 4. Finalize()
/// For files formats that are splittable (and therefore can be written to an
/// arbitrarily large file), 1-4 is called once.
/// For files formats that are not splittable (i.e. columnar formats, compressed
/// text), 1) is called once and 2-4) is called repeatedly for each file.
/// Do initialization of writer.
virtual Status Init() WARN_UNUSED_RESULT = 0;
/// Called when a new file is started.
virtual Status InitNewFile() WARN_UNUSED_RESULT = 0;
/// Appends rows of 'batch' to the partition that are selected via 'row_group_indices',
/// and if the latter is empty, appends every row.
/// If the current file is full, the writer stops appending and returns with
/// *new_file == true. A new file will be opened and the same row batch will be passed
/// again. The writer must track how much of the batch it had already processed asking
/// for a new file. Otherwise the writer will return with *newfile == false.
virtual Status AppendRows(RowBatch* batch,
const std::vector<int32_t>& row_group_indices,
bool* new_file) WARN_UNUSED_RESULT = 0;
/// Finalize this partition. The writer needs to finish processing
/// all data have written out after the return from this call.
/// This is called once for each call to InitNewFile()
virtual Status Finalize() WARN_UNUSED_RESULT = 0;
/// Called once when this writer should cleanup any resources.
virtual void Close() = 0;
/// Returns the stats for this writer.
const DmlStatsPB& stats() const { return stats_; }
/// Returns the stats for the latest iceberg file written by this writer.
const IcebergFileStats& iceberg_file_stats() const { return iceberg_file_stats_; }
/// Default block size to use for this file format. If the file format doesn't
/// care, it should return 0 and the hdfs config default will be used.
virtual uint64_t default_block_size() const = 0;
/// Returns the file extension for this writer.
virtual std::string file_extension() const = 0;
/// Size to buffer output before calling Write() (which calls hdfsWrite), in bytes
/// to minimize the overhead of Write()
static const int HDFS_FLUSH_WRITE_SIZE = 50 * 1024;
/// Write to the current hdfs file.
Status Write(const char* data, int32_t len) {
return Write(reinterpret_cast<const uint8_t*>(data), len);
Status Write(const uint8_t* data, int32_t len);
template<typename T>
Status Write(T v) {
return Write(reinterpret_cast<uint8_t*>(&v), sizeof(T));
/// Parent table sink object
TableSinkBase* parent_;
/// Runtime state.
RuntimeState* state_;
/// Structure describing partition written to by this writer.
/// NOTE: OutputPartition is usually accessed with a unique_ptr. It is safe to use
/// a raw pointer here, because the OutputPartition maintains a scoped_ptr to
/// the HdfsTableWriter objects. This will never outlive the OutputPartition.
OutputPartition* output_;
/// Table descriptor of table to be written.
const HdfsTableDescriptor* table_desc_;
/// Reference to the evaluators of expressions which generate the output value.
/// The evaluators are owned by sink which owns this table writer.
const std::vector<ScalarExprEvaluator*>& output_expr_evals_;
/// Subclass should populate any file format specific stats.
DmlStatsPB stats_;
/// Contains the per-column stats for the latest file written by this writer.
/// Used with iceberg only.
IcebergFileStats iceberg_file_stats_;