blob: 3c2326e8189991830916bcf2d5d4070be29e1899 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_BASE_SEQUENCE_SCANNER_H
#define IMPALA_EXEC_BASE_SEQUENCE_SCANNER_H
#include <vector>
#include <memory>
#include <stdint.h>
#include "exec/hdfs-scanner.h"
namespace impala {
struct HdfsFileDesc;
class ScannerContext;
/// Superclass for all sequence container based file formats: SequenceFile, RCFile, Avro.
/// Sequence container formats have sync markers periodically in the file. This scanner
/// recovers from corrupt or otherwise non-parsable data blocks by skipping to the next
/// sync marker in the range.
///
/// Handling of sync markers:
/// A scanner is responsible for the data with the first complete sync in its scan range
/// through the first complete sync in the next range. 'eos_' is set to true when this
/// scanner has processed all the bytes it is responsible for, i.e., when it reads a sync
/// occurring completely in the next scan range, as this is the first sync that the next
/// scan range will be able to locate. Note that checking context_->eosr() after reading
/// each sync is insufficient for determining 'eos_'. If a sync marker spans two scan
/// ranges, the first scan range must process the following block since the second scan
/// range cannot find the incomplete sync. context_->eosr() will not alert us to this
/// situation, causing the block to be incorrectly skipped.
class BaseSequenceScanner : public HdfsScanner {
public:
/// Issue the initial ranges for all sequence container files. 'files' must not be
/// empty.
static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
const std::vector<HdfsFileDesc*>& files)
WARN_UNUSED_RESULT;
/// Returns true if 'format' uses a scanner derived from BaseSequenceScanner.
static bool FileFormatIsSequenceBased(THdfsFileFormat::type format);
virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
virtual void Close(RowBatch* row_batch);
virtual ~BaseSequenceScanner();
protected:
/// Size of the sync hash field.
const static int SYNC_HASH_SIZE = 16;
/// Data that is shared between scan ranges of the same file. The subclass is
/// responsible for filling in all these fields in ReadFileHeader
struct FileHeader {
virtual ~FileHeader() {}
/// The sync hash for this file.
uint8_t sync[SYNC_HASH_SIZE];
/// true if the file is compressed
bool is_compressed;
/// Codec name if it is compressed
std::string codec;
/// Enum for compression type.
THdfsCompression::type compression_type;
/// Byte size of header. This must not include the sync directly preceding the data
/// (even if the sync is considered part of the header in the file format spec).
int64_t header_size;
};
virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT;
/// Subclasses must implement these functions. The order for calls will be
/// 1. AllocateFileHeader() - called once per file
/// 2. ReadFileHeader() - called once per file
/// 3. InitNewRange() - called once per scan range
/// 4. ProcessRange()* - called until eos, bytes may be skipped between calls to
/// recover from parse errors
/// Allocate a file header object for this scanner. If the scanner needs
/// additional header information, it should subclass FileHeader.
/// The allocated object will be placed in the scan node's pool.
virtual FileHeader* AllocateFileHeader() = 0;
/// Read the file header. The underlying ScannerContext is at the start of
/// the file header. This function must read the file header (which advances
/// context_ past it) and initialize header_.
virtual Status ReadFileHeader() WARN_UNUSED_RESULT = 0;
/// Materializes tuples into 'row_batch' by reading from the underlying ScannerContext.
/// Assumes that the 'stream_' is positioned in the data portion of the range, i.e.,
/// not at a sync marker or other metadata of the range. May set 'eos_'.
virtual Status ProcessRange(RowBatch* row_batch) WARN_UNUSED_RESULT = 0;
/// Returns type of scanner: e.g. rcfile, seqfile
virtual THdfsFileFormat::type file_format() const = 0;
BaseSequenceScanner(HdfsScanNodeBase*, RuntimeState*);
/// Read sync marker from 'stream_' and validate against 'header_->sync'. Returns
/// non-ok if the sync marker did not match. Scanners should always use this function
/// to read sync markers, otherwise eos() might not be updated correctly. If eos()
/// returns true after calling this function, scanners must not process any more
/// records.
Status ReadSync() WARN_UNUSED_RESULT;
/// Utility function to advance 'stream_' past the next sync marker. If no sync is
/// found in the scan range, returns OK and sets 'eos_' to true. It is safe to call
/// this function past eosr.
/// - sync: sync marker to search for (does not include 0xFFFFFFFF prefix)
/// - sync_size: number of bytes for sync
Status SkipToSync(const uint8_t* sync, int sync_size) WARN_UNUSED_RESULT;
/// Estimate of header size in bytes. This is initial number of bytes to issue
/// per file. If the estimate is too low, more bytes will be read as necessary.
const static int HEADER_SIZE;
/// Sync indicator.
const static int SYNC_MARKER;
/// File header for this scan range. This is not owned by the parent scan node.
FileHeader* header_ = nullptr;
/// If true, this scanner object is only for processing the header.
bool only_parsing_header_ = false;
/// Unit test constructor
BaseSequenceScanner();
private:
/// Byte offset from the start of the file for the current block. Note that block refers
/// to all the data between two syncs.
int64_t block_start_ = 0;
/// The total number of bytes in all blocks this scan range has processed (updated in
/// SkipToSync(), so only includes blocks that were completely processed).
int total_block_size_ = 0;
/// The number of syncs seen by this scanner so far.
int num_syncs_ = 0;
/// Callback for stream_ to compute how much to read past the scan range. Returns the
/// average number of bytes per block minus how far 'file_offset' is into the current
/// block.
int ReadPastSize(int64_t file_offset);
/// Utility function to look for 'sync' in buffer. Returns the offset into
/// buffer of the _end_ of sync if it is found, otherwise, returns -1.
int FindSyncBlock(const uint8_t* buffer, int buffer_len, const uint8_t* sync,
int sync_len);
/// Close skipped ranges for 'file'. This is only called when processing
/// the header range and the header had an issue.
void CloseFileRanges(const char* file);
/// Number of bytes skipped when advancing to next sync on error.
RuntimeProfile::Counter* bytes_skipped_counter_ = nullptr;
};
}
#endif