blob: e84a5e7a90ad96aa5fe471b56419c6bfeec0c4d2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_HDFS_SEQUENCE_SCANNER_H
#define IMPALA_EXEC_HDFS_SEQUENCE_SCANNER_H
/// This scanner parses Sequence files and writes the content as tuples in the Impala
/// in-memory representation of data (tuples, rows, row batches).
///
/// TODO: Make the various sequence file formats behave more similarly. They should
/// all have a structure similar to block compressed operating in batches rather than
/// row at a time.
///
/// org.apache.hadoop.io.SequenceFile is the original SequenceFile implementation
/// and should be viewed as the canonical definition of this format. If
/// anything is unclear in this file you should consult the code in
/// org.apache.hadoop.io.SequenceFile.
///
/// The following is a pseudo-BNF grammar for SequenceFile. Comments are prefixed
/// with dashes:
///
/// seqfile ::=
/// <file-header>
/// <record-block>+
///
/// record-block ::=
/// <record>+
/// <file-sync-hash>
///
/// file-header ::=
/// <file-version-header>
/// <file-key-class-name>
/// <file-value-class-name>
/// <file-is-compressed>
/// <file-is-block-compressed>
/// [<file-compression-codec-class>]
/// <file-header-metadata>
/// <file-sync-field>
///
/// file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
///
/// -- The name of the Java class responsible for reading the key buffer
///
/// file-key-class-name ::=
/// Text {"org.apache.hadoop.io.BytesWritable"}
///
/// -- The name of the Java class responsible for reading the value buffer
///
/// -- We don't care what this is.
/// file-value-class-name ::=
///
/// -- Boolean variable indicating whether or not the file uses compression
/// -- for key/values in this file
///
/// file-is-compressed ::= Byte[1]
///
/// -- A boolean field indicating whether or not the file is block compressed.
///
/// file-is-block-compressed ::= Byte[1] {false}
///
/// -- The Java class name of the compression codec iff <file-is-compressed>
/// -- is true. The named class must implement
/// -- org.apache.hadoop.io.compress.CompressionCodec.
/// -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
///
/// file-compression-codec-class ::= Text
///
/// -- A collection of key-value pairs defining metadata values for the
/// -- file. The Map is serialized using standard JDK serialization, i.e.
/// -- an Int corresponding to the number of key-value pairs, followed by
/// -- Text key and value pairs.
///
/// file-header-metadata ::= Map<Text, Text>
///
/// -- A 16 byte marker that is generated by the writer. This marker appears
/// -- at regular intervals at the beginning of records or record blocks
/// -- intended to enable readers to skip to a random part of the file
/// -- the sync hash is preceeded by a length of -1, refered to as the sync marker
///
/// file-sync-hash ::= Byte[16]
///
/// -- Records are all of one type as determined by the compression bits in the header
///
/// record ::=
/// <uncompressed-record> |
/// <block-compressed-record> |
/// <record-compressed-record>
///
/// uncompressed-record ::=
/// <record-length>
/// <key-length>
/// <key>
/// <value>
///
/// record-compressed-record ::=
/// <record-length>
/// <key-length>
/// <key>
/// <compressed-value>
///
/// block-compressed-record ::=
/// <file-sync-field>
/// <key-lengths-block-size>
/// <key-lengths-block>
/// <keys-block-size>
/// <keys-block>
/// <value-lengths-block-size>
/// <value-lengths-block>
/// <values-block-size>
/// <values-block>
///
/// record-length := Int
/// key-length := Int
/// keys-lengths-block-size> := Int
/// value-lengths-block-size> := Int
///
/// keys-block :: = Byte[keys-block-size]
/// values-block :: = Byte[values-block-size]
///
/// -- The key-lengths and value-lengths blocks are are a sequence of lengths encoded
/// -- in ZeroCompressedInteger (VInt) format.
///
/// key-lengths-block :: = Byte[key-lengths-block-size]
/// value-lengths-block :: = Byte[value-lengths-block-size]
///
/// Byte ::= An eight-bit byte
///
/// VInt ::= Variable length integer. The high-order bit of each byte
/// indicates whether more bytes remain to be read. The low-order seven
/// bits are appended as increasingly more significant bits in the
/// resulting integer value.
///
/// Int ::= A four-byte integer in big-endian format.
///
/// Text ::= VInt, Chars (Length prefixed UTF-8 characters)
#include "exec/base-sequence-scanner.h"
namespace impala {
template <bool>
class DelimitedTextParser;
class HdfsSequenceScanner : public BaseSequenceScanner {
public:
/// The four byte SeqFile version header present at the beginning of every
/// SeqFile file: {'S', 'E', 'Q', 6}
static const uint8_t SEQFILE_VERSION_HEADER[4];
HdfsSequenceScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
virtual ~HdfsSequenceScanner();
/// Implementation of HdfsScanner interface.
virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
/// Codegen WriteAlignedTuples(). Stores the resulting function in
/// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
static Status Codegen(HdfsScanNodeBase* node,
const std::vector<ScalarExpr*>& conjuncts,
llvm::Function** write_aligned_tuples_fn)
WARN_UNUSED_RESULT;
protected:
/// Implementation of sequence container super class methods.
virtual FileHeader* AllocateFileHeader();
virtual Status ReadFileHeader() WARN_UNUSED_RESULT;
virtual Status InitNewRange() WARN_UNUSED_RESULT;
virtual Status ProcessRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
virtual THdfsFileFormat::type file_format() const {
return THdfsFileFormat::SEQUENCE_FILE;
}
private:
/// Maximum size of a compressed block. This is used to check for corrupted
/// block size so we do not read the whole file before we detect the error.
static const int64_t MAX_BLOCK_SIZE = 1024 * 1024 * 1024;
/// The value class name located in the SeqFile Header.
/// This is always "org.apache.hadoop.io.Text"
static const char* const SEQFILE_VALUE_CLASS_NAME;
/// Reads the record header and sets 'current_block_length_'.
Status ReadBlockHeader() WARN_UNUSED_RESULT;
/// Processes or continues processing a block-compressed scan range, adding tuples
/// to 'row_batch'. Block-compressed ranges are common and can be parsed more
/// efficiently in larger pieces.
Status ProcessBlockCompressedScanRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
/// Reads a compressed block. Does NOT read sync or -1 marker preceding sync.
/// Decompresses the data into 'unparsed_data_buffer_' allocated from the
/// 'data_buffer_pool_' via the decompressor.
/// Sets 'num_buffered_records_in_compressed_block_' if decompression was
/// successful.
Status ReadCompressedBlock() WARN_UNUSED_RESULT;
/// Utility function for parsing 'next_record_in_compressed_block_'. Called by
/// ProcessBlockCompressedScanRange().
Status ProcessDecompressedBlock(RowBatch* row_batch) WARN_UNUSED_RESULT;
/// Read a single record from the current position in 'stream_', decompressing
/// the record, if necessary. Not used for block compressed files.
/// Output:
/// record_ptr: pointer to the record
/// record_len: length of the record
Status GetRecord(uint8_t** record_ptr, int64_t* record_len) WARN_UNUSED_RESULT;
/// Helper class for picking fields and rows from delimited text.
boost::scoped_ptr<DelimitedTextParser<false>> delimited_text_parser_;
std::vector<FieldLocation> field_locations_;
/// Data that is fixed across headers. This struct is shared between scan ranges.
struct SeqFileHeader : public BaseSequenceScanner::FileHeader {
/// If true, the file uses row compression
bool is_row_compressed;
};
/// Struct for record locations and lens in compressed blocks.
struct RecordLocation {
uint8_t* record;
int64_t len;
};
/// Records are processed in batches. This vector stores batches of record locations
/// that are being processed.
std::vector<RecordLocation> record_locations_;
/// Length of the current sequence file block (or record).
int current_block_length_ = -1;
/// Length of the current key. This is specified as 4 bytes in the format description.
int current_key_length_ = -1;
/// Buffer for data read from the 'stream_' directly or after decompression.
uint8_t* unparsed_data_buffer_ = nullptr;
/// End of data buffer used to check out of bound error.
uint8_t* data_buffer_end_ = nullptr;
/// Number of buffered records unparsed_data_buffer_ from block compressed data.
int64_t num_buffered_records_in_compressed_block_ = 0;
/// Next record from block compressed data.
int64_t next_record_in_compressed_block_len_ = 0;
/// Next record from block compressed data.
uint8_t* next_record_in_compressed_block_ = nullptr;
};
} // namespace impala
#endif // IMPALA_EXEC_HDFS_SEQUENCE_SCANNER_H