blob: 29f0e2c8254ee1f391125e8f04452950f9d12f9a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "common/hdfs.h"
#include "runtime/io/file-reader.h"
namespace impala {
namespace io {
class DataCache;
/// File reader class for HDFS.
class HdfsFileReader : public FileReader {
public:
HdfsFileReader(ScanRange* scan_range, hdfsFS hdfs_fs, bool expected_local) :
FileReader(scan_range), hdfs_fs_(hdfs_fs), expected_local_(expected_local) {
}
~HdfsFileReader();
virtual Status Open(bool use_file_handle_cache) override;
virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* buffer,
int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
virtual void Close() override;
virtual void ResetState() override;
virtual std::string DebugString() const override;
/// Reads from the DN cache. On success, sets cached_buffer_ to the DN buffer
/// and returns a pointer to the underlying raw buffer. 'cached_buffer_' is set to
/// nullptr if the data is not cached and 'length' is set to 0.
///
/// Please note that this interface is only effective for local HDFS reads as it
/// relies on HDFS caching. For remote reads, this interface is not used.
virtual void CachedFile(uint8_t** data, int64_t* length) override;
private:
/// Probes 'remote_data_cache' for a hit. The requested file's name and mtime
/// are stored in 'scan_range_'. 'file_offset' is the offset into the file to read
/// and 'bytes_to_read' is the number of bytes requested. On success, copies the
/// content of the cache into 'buffer' and returns the number of bytes read.
/// Also updates various cache metrics. Returns 0 on cache miss.
int64_t ReadDataCache(DataCache* remote_data_cache, int64_t file_offset,
uint8_t* buffer, int64_t bytes_to_read);
/// Inserts into 'remote_data_cache' with 'buffer' which contains the data read
/// from a file at 'file_offset'. 'buffer_len' is the length of the buffer in bytes.
/// The file's name and mtime are stored in 'scan_range_'. 'cached_bytes_missed' is
/// the number of bytes missed in the cache. Used for updating cache metrics.
/// No guarantee that the entry is inserted as caching is opportunistic.
void WriteDataCache(DataCache* remote_data_cache, int64_t file_offset,
const uint8_t* buffer, int64_t buffer_len, int64_t cached_bytes_missed);
/// Read [position_in_file, position_in_file + bytes_to_read) from 'hdfs_file'
/// into 'buffer'. Update 'bytes_read' on success. Returns error status on
/// failure. When not using HDFS pread, this function will always implicitly
/// seek to 'position_in_file' if 'hdfs_file' is not at it already.
/// 'disk_queue' metrics are updated based on the operation.
Status ReadFromPosInternal(hdfsFile hdfs_file, DiskQueue* disk_queue,
int64_t position_in_file, uint8_t* buffer, int64_t bytes_to_read, int* bytes_read);
/// Update counters with HDFS read statistics from 'hdfs_file'. If 'log_stats' is
/// true, the statistics are logged.
void GetHdfsStatistics(hdfsFile hdfs_file, bool log_stats);
/// Hadoop filesystem that contains the file being read.
hdfsFS const hdfs_fs_;
/// The hdfs file handle is stored here in three cases:
/// 1. The file handle cache is off (max_cached_file_handles == 0).
/// 2. The scan range is using hdfs caching.
/// -OR-
/// 3. The hdfs file is expected to be remote (expected_local_ == false)
/// In each case, the scan range gets a new ExclusiveHdfsFileHandle at Open(),
/// owns it exclusively, and destroys it in Close().
std::unique_ptr<ExclusiveHdfsFileHandle> exclusive_hdfs_fh_;
/// If true, we expect the reads to be a local read. Note that if this is false,
/// it does not necessarily mean we expect the read to be remote, and that we never
/// create scan ranges where some of the range is expected to be remote and some of it
/// local.
/// TODO: we can do more with this
const bool expected_local_;
/// Total number of bytes read remotely. This is necessary to maintain a count of
/// the number of remote scan ranges. Since IO statistics can be collected multiple
/// times for a scan range, it is necessary to keep some state about whether this
/// scan range has already been counted as remote. There is also a requirement to
/// log the number of unexpected remote bytes for a scan range. To solve both
/// requirements, maintain num_remote_bytes_ on the ScanRange and push it to the
/// reader_ once at the close of the scan range.
int64_t num_remote_bytes_ = 0;
/// Non-NULL if a cached read succeeded. Then all the bytes for the file are in
/// this buffer.
hadoopRzBuffer* cached_buffer_ = nullptr;
};
}
}