blob: fb243179baf557a65a16a38da84dcef0e3478009 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/fs/http_file_reader.h"
#include <curl/curl.h>
#include <curl/easy.h>
#include <algorithm>
#include "common/logging.h"
namespace doris::io {
Result<FileReaderSPtr> HttpFileReader::create(const std::string& url,
const std::map<std::string, std::string>& props,
const FileReaderOptions& opts,
RuntimeProfile* /*profile*/) {
OpenFileInfo ofi;
ofi.path = Path(url);
ofi.extend_info = props;
auto reader = std::make_shared<HttpFileReader>(ofi, url);
// Open the file to detect Range support and validate configuration
RETURN_IF_ERROR_RESULT(reader->open(opts));
return reader;
}
HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url)
: _extend_kv(fileInfo.extend_info),
_path(fileInfo.path),
_url(std::move(url)),
_client(std::make_unique<HttpClient>()) {
auto etag_iter = _extend_kv.find("etag");
if (etag_iter != _extend_kv.end()) {
_etag = etag_iter->second;
}
auto lm_iter = _extend_kv.find("last_modified");
if (lm_iter != _extend_kv.end()) {
_last_modified = std::stoll(lm_iter->second);
}
auto size_iter = _extend_kv.find("file_size");
if (size_iter != _extend_kv.end()) {
_file_size = std::stoull(size_iter->second);
_initialized = true;
}
// Parse configuration for non-Range request handling
auto enable_range_iter = _extend_kv.find("http.enable.range.request");
if (enable_range_iter != _extend_kv.end()) {
// Convert to lowercase for case-insensitive comparison
std::string value = enable_range_iter->second;
std::transform(value.begin(), value.end(), value.begin(), ::tolower);
_enable_range_request = (value != "false" && value != "0");
}
auto max_size_iter = _extend_kv.find("http.max.request.size.bytes");
if (max_size_iter != _extend_kv.end()) {
try {
_max_request_size_bytes = std::stoull(max_size_iter->second);
} catch (const std::exception& _) {
LOG(WARNING) << "Invalid http.max.request.size.bytes value: " << max_size_iter->second
<< ", using default: " << DEFAULT_MAX_REQUEST_SIZE;
_max_request_size_bytes = DEFAULT_MAX_REQUEST_SIZE;
}
}
_read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
}
HttpFileReader::~HttpFileReader() {
static_cast<void>(close());
}
Status HttpFileReader::open(const FileReaderOptions& opts) {
if (_initialized) {
return Status::OK();
}
// Step 1: HEAD request to get file metadata
RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
_client->set_method(HttpMethod::HEAD);
RETURN_IF_ERROR(_client->execute());
uint64_t content_length = 0;
RETURN_IF_ERROR(_client->get_content_length(&content_length));
_file_size = content_length;
_size_known = true;
// Step 2: Check if Range request is disabled by configuration
if (!_enable_range_request) {
// User explicitly disabled Range requests, use non-Range mode directly
_range_supported = false;
LOG(INFO) << "Range requests disabled by configuration for " << _url
<< ", using non-Range mode. File size: " << _file_size << " bytes";
// Check if file size exceeds limit for non-Range mode
if (_file_size > _max_request_size_bytes) {
return Status::InternalError(
"Non-Range mode: file size ({} bytes) exceeds maximum allowed size ({} bytes, "
"configured by http.max.request.size.bytes). URL: {}",
_file_size, _max_request_size_bytes, _url);
}
LOG(INFO) << "Non-Range mode validated for " << _url << ", file size: " << _file_size
<< " bytes, max allowed: " << _max_request_size_bytes << " bytes";
} else {
// Step 3: Range request is enabled (default), detect Range support
VLOG(1) << "Detecting Range support for URL: " << _url;
RETURN_IF_ERROR(detect_range_support());
// Step 4: Validate Range support detection result
if (!_range_supported) {
// Server does not support Range and Range is required
return Status::NotSupported(
"HTTP server does not support Range requests (RFC 7233), which is required "
"for reading files. File size: {} bytes, URL: {}. "
"To allow reading without Range support, set "
"'http.enable.range.request'='false' "
"in properties and configure 'http.max.request.size.bytes' appropriately "
"(note: this may cause high memory usage for large files).",
_file_size, _url);
}
LOG(INFO) << "HTTP server supports Range requests for " << _url;
}
_initialized = true;
return Status::OK();
}
Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
VLOG(2) << "HttpFileReader::read_at_impl offset=" << offset << " size=" << result.size
<< " url=" << _url << " range_supported=" << _range_supported;
if (!_read_buffer) {
_read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
}
size_t to_read = result.size;
size_t buffer_offset = 0;
if (_size_known && offset >= _file_size) {
*bytes_read = 0;
return Status::OK();
}
// Try to serve from buffer cache
if (offset >= _buffer_start && offset < _buffer_end) {
size_t buffer_idx = offset - _buffer_start;
size_t available = _buffer_end - offset;
size_t copy_len = std::min(available, to_read);
DCHECK(buffer_idx + copy_len <= READ_BUFFER_SIZE)
<< "Buffer overflow: buffer_idx=" << buffer_idx << " copy_len=" << copy_len
<< " READ_BUFFER_SIZE=" << READ_BUFFER_SIZE;
std::memcpy(result.data, _read_buffer.get() + buffer_idx, copy_len);
buffer_offset += copy_len;
to_read -= copy_len;
offset += copy_len;
VLOG(2) << "Buffer cache hit: copied " << copy_len << " bytes";
} else {
// Buffer miss, invalidate cache
_buffer_start = 0;
_buffer_end = 0;
VLOG(2) << "Buffer cache miss";
}
if (to_read == 0) {
*bytes_read = buffer_offset;
return Status::OK();
}
size_t remaining = to_read;
if (_size_known) {
uint64_t left = (_file_size > offset) ? (_file_size - offset) : 0;
if (left == 0) {
*bytes_read = buffer_offset;
return Status::OK();
}
remaining = std::min<uint64_t>(to_read, left);
}
size_t req_len = (remaining > READ_BUFFER_SIZE) ? remaining : READ_BUFFER_SIZE;
VLOG(2) << "Issuing HTTP GET request: offset=" << offset << " req_len=" << req_len
<< " with_range=" << _range_supported;
// Prepare and initialize the HTTP client for GET request
RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
_client->set_method(HttpMethod::GET);
_client->set_header("Expect", "");
_client->set_header("Connection", "close");
bool with_range = _range_supported;
if (with_range) _client->set_range(offset, req_len);
std::string buf;
buf.reserve(req_len);
size_t total_received = 0;
bool size_limit_exceeded = false;
auto cb = [&](const void* data, size_t len) {
total_received += len;
// If using non-Range mode, enforce size limit to prevent OOM
if (!_range_supported && total_received > _max_request_size_bytes) {
size_limit_exceeded = true;
VLOG(1) << "Stopping download: received " << total_received << " bytes, exceeds limit "
<< _max_request_size_bytes;
return false; // Stop receiving - this will cause CURL to return an error
}
buf.append(reinterpret_cast<const char*>(data), len);
return true;
};
Status exec_status = _client->execute(cb);
// Check if we stopped due to size limit - this is expected behavior
if (size_limit_exceeded) {
return Status::InternalError(
"HTTP response too large: received {} bytes, exceeds maximum allowed size {} "
"bytes (configured by max.request.size.bytes). URL: {}",
total_received, _max_request_size_bytes, _url);
}
// If there's an error and it's not due to our size limit check, return it
RETURN_IF_ERROR(exec_status);
long http_status = _client->get_http_status();
VLOG(2) << "HTTP response: status=" << http_status << " received_bytes=" << buf.size();
if (buf.empty()) {
*bytes_read = buffer_offset;
return Status::OK();
}
// Defensive check: if we sent Range but server returned 200 instead of 206
// This should rarely happen since we detect Range support in open()
if (with_range && offset > 0 && http_status == 200) {
LOG(ERROR) << "HTTP server unexpectedly does not support Range requests for " << _url
<< " (this should have been detected in open()). HTTP status: " << http_status
<< ", received: " << buf.size()
<< " bytes. This indicates a server behavior change.";
return Status::InternalError(
"HTTP server does not support Range requests but this was not detected during "
"file open. This may indicate the server behavior has changed. "
"HTTP status: {}, received: {} bytes. URL: {}",
http_status, buf.size(), _url);
}
// Handle non-Range mode: when _range_supported is false, we download full file
if (!_range_supported && offset > 0) {
// We're in non-Range mode and need data from middle of file
// The full file should have been downloaded
if (offset >= buf.size()) {
*bytes_read = buffer_offset;
return Status::OK();
}
size_t slice_len = std::min<size_t>(remaining, buf.size() - offset);
std::memcpy(result.data + buffer_offset, buf.data() + offset, slice_len);
buffer_offset += slice_len;
size_t cached = std::min(slice_len, (size_t)READ_BUFFER_SIZE);
std::memcpy(_read_buffer.get(), buf.data() + offset, cached);
_buffer_start = offset;
_buffer_end = offset + cached;
*bytes_read = buffer_offset;
return Status::OK();
}
if (to_read > READ_BUFFER_SIZE) {
if (buf.size() > remaining) {
return Status::InternalError("HTTP response larger than requested buffer");
}
std::memcpy(result.data + buffer_offset, buf.data(), buf.size());
buffer_offset += buf.size();
} else {
size_t cached = std::min(buf.size(), (size_t)READ_BUFFER_SIZE);
std::memcpy(_read_buffer.get(), buf.data(), cached);
_buffer_start = offset;
_buffer_end = offset + cached;
size_t copy_len = std::min(remaining, cached);
std::memcpy(result.data + buffer_offset, _read_buffer.get(), copy_len);
buffer_offset += copy_len;
}
if (!_size_known && with_range && buf.size() < req_len) {
_size_known = true;
_file_size = offset + buf.size();
}
*bytes_read = buffer_offset;
return Status::OK();
}
Status HttpFileReader::close() {
if (_closed.exchange(true)) {
return Status::OK();
}
// Release buffer memory (1MB)
_read_buffer.reset();
_buffer_start = 0;
_buffer_end = 0;
// Release HttpClient resources
_client.reset();
return Status::OK();
}
Status HttpFileReader::prepare_client(bool set_fail_on_error) {
if (!_client) {
return Status::InternalError("HttpClient is not initialized");
}
// Initialize the HTTP client with URL
RETURN_IF_ERROR(_client->init(_url, set_fail_on_error));
// Set custom headers from extend_kv
for (const auto& kv : _extend_kv) {
if (kv.first.rfind("http.header.", 0) == 0) {
_client->set_header(kv.first.substr(strlen("http.header.")), kv.second);
}
}
return Status::OK();
}
Status HttpFileReader::detect_range_support() {
// Send a small Range request to test if the server supports it
// We request only the first byte to minimize data transfer
RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
_client->set_method(HttpMethod::GET);
_client->set_range(0, 1); // Request only the first byte
std::string test_buf;
size_t received = 0;
constexpr size_t MAX_TEST_SIZE = 10240; // 10KB max for test
bool stopped_by_limit = false;
auto cb = [&](const void* data, size_t len) {
received += len;
// Limit test data to prevent downloading too much
if (received > MAX_TEST_SIZE) {
stopped_by_limit = true;
VLOG(2) << "Stopping Range detection test after receiving " << received << " bytes";
return false; // This will cause CURL to stop with an error
}
test_buf.append(reinterpret_cast<const char*>(data), len);
return true;
};
Status exec_status = _client->execute(cb);
// If we stopped because of size limit, it's not a real error
if (!exec_status.ok() && stopped_by_limit) {
VLOG(1) << "Range detection stopped at size limit (expected): " << exec_status.to_string();
// Continue processing - this is expected behavior
} else if (!exec_status.ok()) {
// Real error
return exec_status;
}
long http_status = _client->get_http_status();
if (http_status == 206) {
// HTTP 206 Partial Content - server supports Range requests
_range_supported = true;
VLOG(1) << "Range support detected (HTTP 206) for " << _url << ", received "
<< test_buf.size() << " bytes";
} else if (http_status == 200) {
// HTTP 200 OK - server does not support Range requests
// It returned the full file (or a large portion)
_range_supported = false;
VLOG(1) << "Range not supported (HTTP 200) for " << _url << ", received " << test_buf.size()
<< " bytes in test";
// If we received a lot of data, it's likely the full file
if (test_buf.size() >= MAX_TEST_SIZE || stopped_by_limit) {
LOG(WARNING) << "Server returned " << received << "+ bytes for Range test, "
<< "indicating no Range support for " << _url;
}
} else {
// Unexpected status code
LOG(WARNING) << "Unexpected HTTP status " << http_status << " during Range detection for "
<< _url << ", assuming Range is not supported";
_range_supported = false;
}
return Status::OK();
}
} // namespace doris::io