blob: 2d5e6a32b95e3e5b200da12ba28771fc67a91dc2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <event2/http.h>
#include <deque>
#include <memory>
#include <mutex>
#include <string>
#include "common/status.h"
#include "http/http_handler.h"
#include "http/http_request.h"
#include "util/byte_buffer.h"
namespace doris {
// Context for storing stream load forward request information
class StreamLoadForwardContext {
public:
StreamLoadForwardContext() = default;
~StreamLoadForwardContext() {
struct evhttp_connection* conn_to_free = nullptr;
{
std::lock_guard<std::mutex> lock(_mutex);
// forward request will be released by libevent automatically
forward_req = nullptr;
if (conn && !connection_closed) {
// Only free connection if it wasn't already released by libevent
conn_to_free = conn;
conn = nullptr;
}
}
// Free connection outside of mutex to avoid deadlock with connection close callback
if (conn_to_free) {
evhttp_connection_free(conn_to_free);
}
}
void set_forward_request(evhttp_request* req) {
std::lock_guard<std::mutex> lock(_mutex);
forward_req = req;
}
evhttp_request* get_forward_request() {
std::lock_guard<std::mutex> lock(_mutex);
return forward_req;
}
void handle_connection_close() {
std::lock_guard<std::mutex> lock(_mutex);
connection_closed = true;
// Connection has been released by libevent automatically, set pointer to null
// to prevent double-free in destructor
conn = nullptr;
}
bool is_connection_closed() const {
std::lock_guard<std::mutex> lock(_mutex);
return connection_closed;
}
struct evhttp_connection* conn {nullptr};
// Original request reference, lifecycle managed by HTTP framework
HttpRequest* original_req {nullptr};
std::string target_host;
uint16_t target_port {0};
// Buffer for collecting response data
std::string response_data;
std::deque<ByteBufferPtr> request_data_chunks;
size_t total_request_size = 0;
private:
mutable std::mutex _mutex;
struct evhttp_request* forward_req {nullptr};
bool connection_closed {false};
};
// Stream Load request forward handler
// Forwards Stream Load requests to other BE nodes
// Supports streaming forward, maintains original request path format: /api/{db}/{table}/_stream_load_forward
class StreamLoadForwardHandler : public HttpHandler {
public:
StreamLoadForwardHandler() = default;
~StreamLoadForwardHandler() override = default;
void handle(HttpRequest* req) override;
bool request_will_be_read_progressively() override { return true; }
int on_header(HttpRequest* req) override;
void on_chunk_data(HttpRequest* req) override;
private:
Status init_forward_request(HttpRequest* req, const std::string& target_host,
uint16_t target_port,
std::shared_ptr<StreamLoadForwardContext>& ctx);
static void forward_request_done(struct evhttp_request* req, void* arg);
static void forward_request_chunked_cb(struct evhttp_request* req, void* arg);
static void forward_connection_close_cb(struct evhttp_connection* conn, void* arg);
// Response helper functions
static void send_complete_response(struct evhttp_request* req, StreamLoadForwardContext* ctx,
int response_code);
static void copy_response_headers(struct evkeyvalq* input_headers,
struct evkeyvalq* output_headers);
Status parse_forward_target(const std::string& forward_to, std::string& host, uint16_t& port);
std::string build_forward_url(HttpRequest* req);
void setup_forward_headers(HttpRequest* req, struct evhttp_request* forward_req,
const std::string& target_host, int target_port);
};
} // namespace doris