| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #ifndef IMPALA_EXEC_HDFS_AVRO_WRITER_H |
| #define IMPALA_EXEC_HDFS_AVRO_WRITER_H |
| |
| #include <hdfs.h> |
| #include <sstream> |
| #include <string> |
| |
| #include "common/status.h" |
| #include "exec/hdfs-table-writer.h" |
| #include "runtime/mem-pool.h" |
| #include "util/codec.h" |
| #include "exec/write-stream.h" |
| |
| namespace impala { |
| |
| struct ColumnType; |
| class HdfsTableSink; |
| class RuntimeState; |
| class ScalarExprEvaluator; |
| class TupleDescriptor; |
| class TupleRow; |
| struct OutputPartition; |
| struct StringValue; |
| |
| /// Consumes rows and outputs the rows into an Avro file in HDFS |
| /// Each Avro file contains a block of records (rows). The file metadata specifies the |
| /// schema of the records in addition to the name of the codec, if any, used to compress |
| /// blocks. The structure is: |
| /// [ Metadata ] |
| /// [ Sync Marker ] |
| /// [ Data Block ] |
| /// ... |
| /// [ Data Block ] |
| // |
| /// Each Data Block consists of: |
| /// [ Number of Rows in Block ] |
| /// [ Size of serialized objects, after compression ] |
| /// [ Serialized objects, compressed ] |
| /// [ Sync Marker ] |
| // |
| /// If compression is used, each block is compressed individually. The block size defaults |
| /// to about 64KB before compression. |
| /// This writer implements the Avro 1.7.7 spec: |
| /// http://avro.apache.org/docs/1.7.7/spec.html |
| class HdfsAvroTableWriter : public HdfsTableWriter { |
| public: |
| HdfsAvroTableWriter(HdfsTableSink* parent, |
| RuntimeState* state, OutputPartition* output, |
| const HdfsPartitionDescriptor* partition, |
| const HdfsTableDescriptor* table_desc); |
| |
| virtual ~HdfsAvroTableWriter() { } |
| |
| virtual Status Init() override; |
| virtual Status Finalize() override { return Flush(); } |
| virtual Status InitNewFile() override { return WriteFileHeader(); } |
| virtual void Close() override; |
| virtual uint64_t default_block_size() const override { return 0; } |
| virtual std::string file_extension() const override { return "avro"; } |
| |
| /// Outputs the given rows into an HDFS sequence file. The rows are buffered |
| /// to fill a sequence file block. |
| virtual Status AppendRows(RowBatch* rows, |
| const std::vector<int32_t>& row_group_indices, bool* new_file) override; |
| |
| private: |
| /// Processes a single row, appending to out_ |
| void ConsumeRow(TupleRow* row); |
| |
| /// Adds an encoded field to out_ |
| inline void AppendField(const ColumnType& type, const void* value); |
| |
| /// Writes the Avro file header to HDFS |
| Status WriteFileHeader() WARN_UNUSED_RESULT; |
| |
| /// Writes the contents of out_ to HDFS as a single Avro file block. |
| /// Returns an error if write to HDFS fails. |
| Status Flush() WARN_UNUSED_RESULT; |
| |
| /// Buffer which holds accumulated output |
| WriteStream out_; |
| |
| /// Memory pool used by codec to allocate output buffer. |
| /// Owned by this class. Initialized using parent's memtracker. |
| boost::scoped_ptr<MemPool> mem_pool_; |
| |
| /// Number of rows consumed since last flush |
| uint64_t unflushed_rows_; |
| |
| /// Name of codec, only set if codec_type_ != NONE |
| std::string codec_name_; |
| |
| /// Type of the codec, will be NONE if no compression is used |
| THdfsCompression::type codec_type_; |
| |
| /// The codec for compressing, only set if codec_type_ != NONE |
| boost::scoped_ptr<Codec> compressor_; |
| |
| /// 16 byte sync marker (a uuid) |
| std::string sync_marker_; |
| }; |
| |
| } // namespace impala |
| #endif |