blob: f35bcf836027db06314ae54bd18d11a8ea124506 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_HDFS_RCFILE_SCANNER_H
#define IMPALA_EXEC_HDFS_RCFILE_SCANNER_H
/// org.apache.hadoop.hive.ql.io.RCFile is the original RCFile implementation
/// and should be viewed as the canonical definition of this format. If
/// anything is unclear in this file you should consult the code in
/// org.apache.hadoop.hive.ql.io.RCFile.
//
/// The following is a pseudo-BNF grammar for RCFile. Comments are prefixed
/// with dashes:
//
/// rcfile ::=
/// <file-header>
/// <rcfile-rowgroup>+
//
/// file-header ::=
/// <file-version-header>
/// <file-key-class-name> (only exists if version is seq6)
/// <file-value-class-name> (only exists if version is seq6)
/// <file-is-compressed>
/// <file-is-block-compressed> (only exists if version is seq6)
/// [<file-compression-codec-class>]
/// <file-header-metadata>
/// <file-sync-field>
//
/// -- The normative RCFile implementation included with Hive is actually
/// -- based on a modified version of Hadoop's SequenceFile code. Some
/// -- things which should have been modified were not, including the code
/// -- that writes out the file version header. Consequently, RCFile and
/// -- SequenceFile originally shared the same version header. A newer
/// -- release has created a unique version string.
//
/// file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
/// | Byte[4] {'R', 'C', 'F', 1}
//
/// -- The name of the Java class responsible for reading the key buffer
/// -- component of the rowgroup.
//
/// file-key-class-name ::=
/// Text {"org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"}
//
/// -- The name of the Java class responsible for reading the value buffer
/// -- component of the rowgroup.
//
/// file-value-class-name ::=
/// Text {"org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer"}
//
/// -- Boolean variable indicating whether or not the file uses compression
/// -- for the key and column buffer sections.
//
/// file-is-compressed ::= Byte[1]
//
/// -- A boolean field indicating whether or not the file is block compressed.
/// -- This field is *always* false. According to comments in the original
/// -- RCFile implementation this field was retained for backwards
/// -- compatability with the SequenceFile format.
//
/// file-is-block-compressed ::= Byte[1] {false}
//
/// -- The Java class name of the compression codec iff <file-is-compressed>
/// -- is true. The named class must implement
/// -- org.apache.hadoop.io.compress.CompressionCodec.
/// -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
//
/// file-compression-codec-class ::= Text
//
/// -- A collection of key-value pairs defining metadata values for the
/// -- file. The Map is serialized using standard JDK serialization, i.e.
/// -- an Int corresponding to the number of key-value pairs, followed by
/// -- Text key and value pairs. The following metadata properties are
/// -- mandatory for all RCFiles:
/// --
/// -- hive.io.rcfile.column.number: the number of columns in the RCFile
//
/// file-header-metadata ::= Map<Text, Text>
//
/// -- A 16 byte marker that is generated by the writer. This marker appears
/// -- at regular intervals at the beginning of rowgroup-headers, and is
/// -- intended to enable readers to skip over corrupted rowgroups.
//
/// file-sync-hash ::= Byte[16]
//
/// -- Each row group is split into three sections: a header, a set of
/// -- key buffers, and a set of column buffers. The header section includes
/// -- an optional sync hash, information about the size of the row group, and
/// -- the total number of rows in the row group. Each key buffer
/// -- consists of run-length encoding data which is used to decode
/// -- the length and offsets of individual fields in the corresponding column
/// -- buffer.
//
/// rcfile-rowgroup ::=
/// <rowgroup-header>
/// <rowgroup-key-data>
/// <rowgroup-column-buffers>
//
/// rowgroup-header ::=
/// [<rowgroup-sync-marker>, <rowgroup-sync-hash>]
/// <rowgroup-record-length>
/// <rowgroup-key-length>
/// <rowgroup-compressed-key-length>
//
/// -- rowgroup-key-data is compressed if the column data is compressed.
/// rowgroup-key-data ::=
/// <rowgroup-num-rows>
/// <rowgroup-key-buffers>
//
/// -- An integer (always -1) signaling the beginning of a sync-hash
/// -- field.
//
/// rowgroup-sync-marker ::= Int
//
/// -- A 16 byte sync field. This must match the <file-sync-hash> value read
/// -- in the file header.
//
/// rowgroup-sync-hash ::= Byte[16]
//
/// -- The record-length is the sum of the number of bytes used to store
/// -- the key and column parts, i.e. it is the total length of the current
/// -- rowgroup.
//
/// rowgroup-record-length ::= Int
//
/// -- Total length in bytes of the rowgroup's key sections.
//
/// rowgroup-key-length ::= Int
//
/// -- Total compressed length in bytes of the rowgroup's key sections.
//
/// rowgroup-compressed-key-length ::= Int
//
/// -- Number of rows in the current rowgroup.
//
/// rowgroup-num-rows ::= VInt
//
/// -- One or more column key buffers corresponding to each column
/// -- in the RCFile.
//
/// rowgroup-key-buffers ::= <rowgroup-key-buffer>+
//
/// -- Data in each column buffer is stored using a run-length
/// -- encoding scheme that is intended to reduce the cost of
/// -- repeated column field values. This mechanism is described
/// -- in more detail in the following entries.
//
/// rowgroup-key-buffer ::=
/// <column-buffer-length>
/// <column-buffer-uncompressed-length>
/// <column-key-buffer-length>
/// <column-key-buffer>
//
/// -- The serialized length on disk of the corresponding column buffer.
//
/// column-buffer-length ::= VInt
//
/// -- The uncompressed length of the corresponding column buffer. This
/// -- is equivalent to column-buffer-length if the RCFile is not compressed.
//
/// column-buffer-uncompressed-length ::= VInt
//
/// -- The length in bytes of the current column key buffer
//
/// column-key-buffer-length ::= VInt
//
/// -- The column-key-buffer contains a sequence of serialized VInt values
/// -- corresponding to the byte lengths of the serialized column fields
/// -- in the corresponding rowgroup-column-buffer. For example, consider
/// -- an integer column that contains the consecutive values 1, 2, 3, 44.
/// -- The RCFile format stores these values as strings in the column buffer,
/// -- e.g. "12344". The length of each column field is recorded in
/// -- the column-key-buffer as a sequence of VInts: 1,1,1,2. However,
/// -- if the same length occurs repeatedly, then we replace repeated
/// -- run lengths with the complement (i.e. negative) of the number of
/// -- repetitions, so 1,1,1,2 becomes 1,~2,2.
//
/// column-key-buffer ::= Byte[column-key-buffer-length]
//
/// rowgroup-column-buffers ::= <rowgroup-value-buffer>+
//
/// -- RCFile stores all column data as strings regardless of the
/// -- underlying column type. The strings are neither length-prefixed or
/// -- null-terminated, and decoding them into individual fields requires
/// -- the use of the run-length information contained in the corresponding
/// -- column-key-buffer.
//
/// rowgroup-column-buffer ::= Byte[column-buffer-length]
//
/// Byte ::= An eight-bit byte
//
/// VInt ::= Variable length integer. The high-order bit of each byte
/// indicates whether more bytes remain to be read. The low-order seven
/// bits are appended as increasingly more significant bits in the
/// resulting integer value.
//
/// Int ::= A four-byte integer in big-endian format.
//
/// Text ::= VInt, Chars (Length prefixed UTF-8 characters)
//
/// The above file format is read in chunks. The "key" buffer is read. The "keys"
/// are really the lengths of the column data blocks and the lengths of the values
/// within those blocks. Using this information the column "buffers" (data)
/// that are needed by the query are read into a single buffer. Column data that
/// is not used by the query is skipped and not read from the file. The key data
/// and the column data may be compressed. The key data is compressed in a single
/// block while the column data is compressed separately by column.
#include "exec/base-sequence-scanner.h"
namespace impala {
struct HdfsFileDesc;
class HdfsScanNodeBase;
class TupleDescriptor;
class Tuple;
/// A scanner for reading RCFiles into tuples.
class HdfsRCFileScanner : public BaseSequenceScanner {
public:
HdfsRCFileScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
virtual ~HdfsRCFileScanner();
virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
void DebugString(int indentation_level, std::stringstream* out) const;
private:
/// The key class name located in the RCFile Header.
/// This is always "org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"
static const char* const RCFILE_KEY_CLASS_NAME;
/// The value class name located in the RCFile Header.
/// This is always "org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer"
static const char* const RCFILE_VALUE_CLASS_NAME;
/// RCFile metadata key for determining the number of columns
/// present in the RCFile: "hive.io.rcfile.column.number"
static const char* const RCFILE_METADATA_KEY_NUM_COLS;
/// The four byte RCFile unique version header present at the beginning
/// of the file {'R', 'C', 'F' 1}
static const uint8_t RCFILE_VERSION_HEADER[4];
// Check max column limit
static const int MAX_NCOLS;
/// Implementation of superclass functions.
virtual FileHeader* AllocateFileHeader();
virtual Status ReadFileHeader() WARN_UNUSED_RESULT;
virtual Status InitNewRange() WARN_UNUSED_RESULT;
virtual Status ProcessRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
virtual THdfsFileFormat::type file_format() const { return THdfsFileFormat::RC_FILE; }
/// Reads the RCFile Header Metadata section in the current file to determine the number
/// of columns. Other pieces of the metadata are ignored.
Status ReadNumColumnsMetadata() WARN_UNUSED_RESULT;
/// Reads the rowgroup header starting after the sync.
/// Sets:
/// key_length_
/// compressed_key_length_
/// num_rows_
Status ReadRowGroupHeader() WARN_UNUSED_RESULT;
/// Read the rowgroup key buffers, decompress if necessary.
/// The "keys" are really the lengths for the column values. They
/// are read here and then used to decode the values in the column buffer.
/// Calls GetCurrentKeyBuffer for each column to process the key data.
Status ReadKeyBuffers() WARN_UNUSED_RESULT;
/// Process the current key buffer.
/// Inputs:
/// col_idx: column to process
/// skip_col_data: if true, just skip over the key data.
/// Input/Output:
/// key_buf_ptr: Pointer to the buffered file data, this will be moved
/// past the data for this column.
/// buf_len: Length of the buffer that will be read.
/// Sets:
/// col_buf_len_
/// col_buf_uncompressed_len_
/// col_key_bufs_
/// col_bufs_off_
Status GetCurrentKeyBuffer(
int col_idx, bool skip_col_data, uint8_t** key_buf_ptr, int buf_length);
/// Read the rowgroup column buffers
/// Sets:
/// column_buffer_: Fills the buffer with either file data or decompressed data.
Status ReadColumnBuffers() WARN_UNUSED_RESULT;
/// Look at the next field in the specified column buffer
/// Input:
/// col_idx: Column of the field.
/// Modifies:
/// cur_field_length_rep_[col_idx]
/// key_buf_pos_[col_idx]
/// cur_field_length_rep_[col_idx]
/// cur_field_length_[col_idx]
Status NextField(int col_idx) WARN_UNUSED_RESULT;
/// Read a row group (except for the sync marker and sync) into buffers.
/// Calls:
/// ReadRowGroupHeader()
/// ReadKeyBuffers()
/// ReadColumnBuffers()
Status StartRowGroup() WARN_UNUSED_RESULT;
/// Move to next row. Calls NextField on each column that we are reading.
/// Modifies:
/// row_pos_
Status NextRow() WARN_UNUSED_RESULT;
/// Error message printed on a formatted stream when a bad column idex is encountered.
/// Input:
/// col_idx: column to print.
/// Output:
/// Error status, with formatted stream.
Status BadColumnInfo(int col_idx);
enum Version {
SEQ6, // Version for sequence file and pre hive-0.9 rc files
RCF1 // The version post hive-0.9 which uses a new header
};
/// Data that is fixed across headers. This struct is shared between scan ranges.
struct RcFileHeader : public BaseSequenceScanner::FileHeader {
/// RC file version
Version version;
/// The number of columns in the file (may be more than the number of columns in the
/// table metadata)
int num_cols;
};
/// Struct encapsulating all the state for parsing a single column from a row
/// group
struct ColumnInfo {
/// If true, this column should be materialized, otherwise, it can be skipped
bool materialize_column;
/// Uncompressed and compressed byte lengths for this column
int32_t buffer_len;
int32_t uncompressed_buffer_len;
/// Length and start of the key for this column.
int32_t key_buffer_len;
/// This is a ptr into the scanner's key_buffer_ for this column.
uint8_t* key_buffer;
/// Length of the key buffer
int32_t buf_length;
/// Current position in the key buffer
int32_t key_buffer_pos;
/// Offset into row_group_buffer_ for the start of this column.
int32_t start_offset;
/// Offset from the start of the column for the next field in the column
int32_t buffer_pos;
/// RLE: Length of the current field
int32_t current_field_len;
/// RLE: Repetition count of the current field
int32_t current_field_len_rep;
};
/// Vector of column descriptions for each column in the file (i.e., may contain a
/// different number of non-partition columns than are in the table metadata). Indexed
/// by column index, including non-materialized columns.
std::vector<ColumnInfo> columns_;
/// Buffer for copying key buffers. This buffer is reused between row groups.
std::vector<uint8_t> key_buffer_;
/// number of rows in this rowgroup object
int num_rows_ = 0;
/// Current row position in this rowgroup.
/// This value is incremented each time NextRow() is called.
int row_pos_ = 0;
/// Size of the row group's key buffers.
/// Read from the row group header.
int key_length_ = -1;
/// Compressed size of the row group's key buffers.
/// Read from the row group header.
int compressed_key_length_ = -1;
/// If true, the row_group_buffer_ can be reused across row groups, otherwise,
/// it (more specifically the data_buffer_pool_ that allocated the row_group_buffer_)
/// must be attached to the row batch.
bool reuse_row_group_buffer_ = false;
/// Buffer containing the entire row group. We allocate a buffer for the entire
/// row group, skipping non-materialized columns.
uint8_t* row_group_buffer_ = nullptr;
/// Sum of the bytes lengths of the materialized columns in the current row group. This
/// is the number of valid bytes in row_group_buffer_.
int64_t row_group_length_ = 0;
/// This is the allocated size of 'row_group_buffer_'. 'row_group_buffer_' is reused
/// across row groups and will grow as necessary.
int64_t row_group_buffer_size_ = 0;
};
}
#endif