blob: a909e00035a25038d650b1bc9bdfae351e4ae8bb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_BASE_SEQUENCE_SCANNER_H
#define IMPALA_EXEC_BASE_SEQUENCE_SCANNER_H
#include <vector>
#include <memory>
#include <stdint.h>
#include "exec/hdfs-scanner.h"
namespace impala {
struct HdfsFileDesc;
class ScannerContext;
/// Superclass for all sequence container based file formats:
/// e.g. SequenceFile, RCFile, Avro
/// Sequence container formats have sync markers periodically in the file.
/// This class is will skip to the start of sync markers for errors and
/// hdfs splits.
class BaseSequenceScanner : public HdfsScanner {
public:
/// Issue the initial ranges for all sequence container files.
static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
const std::vector<HdfsFileDesc*>& files);
virtual Status Open(ScannerContext* context);
virtual void Close(RowBatch* row_batch);
virtual Status ProcessSplit();
virtual ~BaseSequenceScanner();
protected:
/// Size of the sync hash field.
const static int SYNC_HASH_SIZE = 16;
/// Data that is shared between scan ranges of the same file. The subclass is
/// responsible for filling in all these fields in ReadFileHeader
struct FileHeader {
virtual ~FileHeader() {}
/// The sync hash for this file.
uint8_t sync[SYNC_HASH_SIZE];
/// true if the file is compressed
bool is_compressed;
/// Codec name if it is compressed
std::string codec;
/// Enum for compression type.
THdfsCompression::type compression_type;
/// Byte size of header. This must not include the sync directly preceding the data
/// (even if the sync is considered part of the header in the file format spec).
int64_t header_size;
};
/// Subclasses must implement these functions. The order for calls will be
/// 1. AllocateFileHeader() - called once per file
/// 2. ReadFileHeader() - called once per file
/// 3. InitNewRange()
/// 4. ProcessRange()
/// In the normal case, 3 and 4 are called for each scan range once. In the
/// case of errors and skipped bytes, 4 is repeatedly called, each time
/// starting right after the sync marker.
/// Allocate a file header object for this scanner. If the scanner needs
/// additional header information, it should subclass FileHeader.
/// The allocated object will be placed in the scan node's pool.
virtual FileHeader* AllocateFileHeader() = 0;
/// Read the file header. The underlying ScannerContext is at the start of
/// the file header. This function must read the file header (which advances
/// context_ past it) and initialize header_.
virtual Status ReadFileHeader() = 0;
/// Process the current range until the end or an error occurred. Note this might
/// be called multiple times if we skip over bad data.
/// This function should read from the underlying ScannerContext materializing
/// tuples to the context. When this function is called, it is guaranteed to be
/// at the start of a data block (i.e. right after the sync marker).
virtual Status ProcessRange() = 0;
/// Returns type of scanner: e.g. rcfile, seqfile
virtual THdfsFileFormat::type file_format() const = 0;
BaseSequenceScanner(HdfsScanNodeBase*, RuntimeState*);
/// Read and validate sync marker against header_->sync. Returns non-ok if the sync
/// marker did not match. Scanners should always use this function to read sync markers,
/// otherwise finished() might not be updated correctly. If finished() returns true after
/// calling this function, scanners must not process any more records.
Status ReadSync();
/// Utility function to advance past the next sync marker, reading bytes from stream_.
/// If no sync is found in the scan range, return Status::OK and sets finished_ to
/// true. It is safe to call this function past eosr.
/// - sync: sync marker to search for (does not include 0xFFFFFFFF prefix)
/// - sync_size: number of bytes for sync
Status SkipToSync(const uint8_t* sync, int sync_size);
bool finished() { return finished_; }
/// Estimate of header size in bytes. This is initial number of bytes to issue
/// per file. If the estimate is too low, more bytes will be read as necessary.
const static int HEADER_SIZE;
/// Sync indicator.
const static int SYNC_MARKER;
/// File header for this scan range. This is not owned by the parent scan node.
FileHeader* header_;
/// If true, this scanner object is only for processing the header.
bool only_parsing_header_;
/// Unit test constructor
BaseSequenceScanner();
private:
/// Set to true when this scanner has processed all the bytes it is responsible
/// for, i.e., when it reads a sync occurring completely in the next scan
/// range, as this is the first sync that the next scan range will be able to
/// locate. (Each scan range is responsible for the first complete sync in the
/// range through the first complete sync in the next range.)
//
/// We need this variable because checking context_->eosr() after reading each
/// sync is insufficient. If a sync marker spans two scan ranges, the first
/// scan range must process the following block since the second scan range
/// cannot find the incomplete sync. context_->eosr() will not alert us to this
/// situation, causing the block to be skipped.
//
/// finished_ is set by ReadSync() and SkipToSync().
bool finished_;
/// Byte offset from the start of the file for the current block. Note that block refers
/// to all the data between two syncs.
int64_t block_start_;
/// The total number of bytes in all blocks this scan range has processed (updated in
/// SkipToSync(), so only includes blocks that were completely processed).
int total_block_size_;
/// The number of syncs seen by this scanner so far.
int num_syncs_;
/// Callback for stream_ to compute how much to read past the scan range. Returns the
/// average number of bytes per block minus how far 'file_offset' is into the current
/// block.
int ReadPastSize(int64_t file_offset);
/// Utility function to look for 'sync' in buffer. Returns the offset into
/// buffer of the _end_ of sync if it is found, otherwise, returns -1.
int FindSyncBlock(const uint8_t* buffer, int buffer_len, const uint8_t* sync,
int sync_len);
/// Close skipped ranges for 'file'. This is only called when processing
/// the header range and the header had an issue.
void CloseFileRanges(const char* file);
/// Number of bytes skipped when advancing to next sync on error.
RuntimeProfile::Counter* bytes_skipped_counter_;
};
}
#endif