| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <atomic> |
| #include <cstddef> |
| #include <cstdint> |
| #include <deque> |
| #include <functional> |
| #include <map> |
| #include <memory> |
| #include <mutex> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| #include <gtest/gtest_prod.h> |
| |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/subprocess/subprocess.pb.h" |
| #include "kudu/subprocess/subprocess_protocol.h" |
| #include "kudu/util/blocking_queue.h" |
| #include "kudu/util/countdown_latch.h" |
| #include "kudu/util/locks.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/mutex.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/status_callback.h" |
| |
| namespace kudu { |
| |
| class Env; |
| class Fifo; |
| class Subprocess; |
| class Thread; |
| |
| namespace subprocess { |
| |
| typedef int64_t CallId; |
| |
| struct SimpleTimer { |
| SimpleTimer() { |
| start_time = MonoTime::Now(); |
| } |
| MonoTime start_time; |
| MonoDelta elapsed() const { |
| return MonoTime::Now() - start_time; |
| } |
| }; |
| |
| struct SubprocessMetrics { |
| // Metrics returned from the subprocess. |
| scoped_refptr<Histogram> sp_execution_time_ms; |
| scoped_refptr<Histogram> sp_inbound_queue_length; |
| scoped_refptr<Histogram> sp_inbound_queue_time_ms; |
| scoped_refptr<Histogram> sp_outbound_queue_length; |
| scoped_refptr<Histogram> sp_outbound_queue_time_ms; |
| |
| // Metrics recorded by the SubprocessServer. |
| scoped_refptr<Histogram> server_inbound_queue_size_bytes; |
| scoped_refptr<Histogram> server_inbound_queue_time_ms; |
| scoped_refptr<Histogram> server_outbound_queue_size_bytes; |
| scoped_refptr<Histogram> server_outbound_queue_time_ms; |
| }; |
| |
| // Encapsulates the pending state of a request that is in the process of being |
| // sent to a subprocess. These calls are added to an in-flight map before |
| // calling SendRequest(). See the method comments for some discussion about |
| // thread-safety. |
| class SubprocessCall { |
| public: |
| SubprocessCall(const SubprocessRequestPB* req, |
| SubprocessResponsePB* resp, |
| StatusCallback* cb, |
| MonoTime deadline) |
| : id_(req->id()), |
| deadline_(deadline), |
| req_(req), resp_(resp), cb_(cb) {} |
| |
| CallId id() const { |
| return id_; |
| } |
| |
| MonoTime deadline() const { |
| return deadline_; |
| } |
| |
| // Sends the request with the given message protocol, ensuring that the |
| // sending of the request doesn't overlap with the calling of the callback |
| // (which may, in turn, delete the request state). If the request couldn't be |
| // sent, runs the callback with an appropriate error immediately. |
| void SendRequest(SubprocessProtocol* message_protocol) { |
| std::lock_guard<Mutex> l(lock_); |
| // If we've already run the callback, (e.g. we passed the deadline before |
| // sending) we shouldn't try sending the message; the request and response |
| // may have been destructed. |
| if (!cb_) { |
| return; |
| } |
| Status s = message_protocol->SendMessage(*req_); |
| // If we failed to send the message, return the error to the caller. |
| if (PREDICT_FALSE(!s.ok())) { |
| WARN_NOT_OK(s, "failed to send request"); |
| (*cb_)(s); |
| cb_ = nullptr; |
| } |
| } |
| |
| void RespondSuccess(SubprocessResponsePB resp) { |
| std::lock_guard<Mutex> l(lock_); |
| // If we've already run the callback (e.g. we passed the deadline before |
| // responding), there's nothing to do. |
| if (!cb_) { |
| return; |
| } |
| *resp_ = std::move(resp); |
| (*cb_)(Status::OK()); |
| cb_ = nullptr; |
| } |
| |
| void RespondError(const Status& s) { |
| DCHECK(!s.ok()); |
| std::lock_guard<Mutex> l(lock_); |
| // If we've already run the callback (e.g. we failed to send the message), |
| // there's nothing to do. |
| if (!cb_) { |
| return; |
| } |
| (*cb_)(s); |
| cb_ = nullptr; |
| } |
| |
| private: |
| friend struct RequestLogicalSize; |
| |
| // Lock used to ensure that the sending of the request doesn't overlap with |
| // the invocation of the callback. This is important because the callback may |
| // destroy the message state (so it is unsafe to update or dereference 'req_' |
| // or 'resp_' afterwards). Note that it is safe to update 'cb_' after the |
| // callback is invoked. |
| Mutex lock_; |
| |
| // ID of this call. |
| const CallId id_; |
| |
| // Deadline for this call. |
| const MonoTime deadline_; |
| |
| // Request and response associated with this call. |
| const SubprocessRequestPB* req_; |
| SubprocessResponsePB* resp_; |
| |
| // Callback to wake up the caller that enqueued this call. This is called |
| // exactly once per SubprocessCall. |
| StatusCallback* cb_; |
| }; |
| |
| // Used by BlockingQueue to determine the size of messages. |
| typedef std::pair<std::shared_ptr<SubprocessCall>, SimpleTimer> CallAndTimer; |
| struct RequestLogicalSize { |
| static size_t logical_size(const CallAndTimer& call_and_timer) { |
| return call_and_timer.first->req_->ByteSizeLong(); |
| } |
| }; |
| typedef std::pair<SubprocessResponsePB, SimpleTimer> ResponsePBAndTimer; |
| struct ResponseLogicalSize { |
| static size_t logical_size(const ResponsePBAndTimer& resp_and_timer) { |
| return resp_and_timer.first.ByteSizeLong(); |
| } |
| }; |
| |
| typedef BlockingQueue<CallAndTimer, RequestLogicalSize> SubprocessCallQueue; |
| typedef BlockingQueue<ResponsePBAndTimer, ResponseLogicalSize> ResponseQueue; |
| |
| // Wrapper for a subprocess that communicates via protobuf. A server is |
| // comprised of a few things to facilitate concurrent communication with an |
| // underlying subprocess: |
| // |
| // - An outbound queue of SubprocessCalls to send to the subprocess. When a |
| // user enqueues a call, that call is first added to the outbound queue. |
| // |
| // - One "writer" thread: this thread pulls work off of the outbound queue and |
| // writes it to the subprocess pipe. When a SubprocessCall's request is |
| // written to the pipe, the call is tracked, and its callback may be called |
| // at any time by the deadline checker or upon receiving a valid response. |
| // |
| // - One "reader" thread: this thread reads messages from subprocess pipe and |
| // puts it on the inbound response queue. |
| // |
| // - An inbound queue of SubprocessResponsePBs that is populated by the reader |
| // thread. |
| // |
| // - Many "responder" threads: each thread looks for a response on the inbound |
| // queue and calls the appropriate callback for it, based on the response ID. |
| // |
| // - One "deadline-checker" thread: this thread looks through the oldest calls |
| // that have been sent to the subprocess and runs their callbacks with a |
| // TimedOut error if they are past their deadline. |
| // |
| // Public methods are virtual so a mock server can be used in tests. |
| class SubprocessServer { |
| public: |
| // Returns a path based on 'base' that can be used as a fifo, avoiding |
| // collisions between subprocesses started in different process and threads. |
| static std::string FifoPath(const std::string& base); |
| |
| SubprocessServer(Env* env, const std::string& receiver_file, |
| std::vector<std::string> subprocess_argv, SubprocessMetrics metrics); |
| virtual ~SubprocessServer(); |
| |
| // Initialize the server, starting the subprocess and worker threads. |
| virtual Status Init() WARN_UNUSED_RESULT; |
| |
| // Synchronously sends a request to the subprocess and populates 'resp' with |
| // contents returned from the subprocess, or returns an error if anything |
| // failed or timed out along the way. |
| virtual Status Execute(SubprocessRequestPB* req, SubprocessResponsePB* resp) WARN_UNUSED_RESULT; |
| |
| private: |
| FRIEND_TEST(SubprocessServerTest, TestCallsReturnWhenShuttingDown); |
| |
| void StartSubprocessThread(const StatusCallback& cb); |
| |
| // Stop the subprocess and stop processing messages. |
| void Shutdown(); |
| |
| // Add the call to the outbound queue, returning an error if the call timed |
| // out before successfully adding it to the queue, or if the queue is shut |
| // down. |
| // |
| // The call's callback is run asynchronously upon receiving a response from |
| // the subprocess, matched by ID, or when the deadline checker thread detects |
| // that the call has timed out. |
| Status QueueCall(const std::shared_ptr<SubprocessCall>& call) WARN_UNUSED_RESULT; |
| |
| // Long running thread that repeatedly looks at the in-flight call with the |
| // lowest ID, checks whether its deadline has expired, and runs its callback |
| // with a TimedOut error if so. |
| void CheckDeadlinesThread(); |
| |
| // Pulls responses from the inbound response queue and calls the associated |
| // callbacks. |
| void ResponderThread(); |
| |
| // Pulls enqueued calls from the outbound request queue and sends their |
| // associated requests to the subprocess. |
| void SendMessagesThread(); |
| |
| // Receives messages from the subprocess and puts the responses onto the |
| // inbound response queue. |
| void ReceiveMessagesThread(); |
| |
| // Fixed timeout to be used for each call. |
| const MonoDelta call_timeout_; |
| |
| // Next request ID to be assigned. |
| std::atomic<CallId> next_id_; |
| |
| // Latch used to indicate that the server is shutting down. |
| CountDownLatch closing_; |
| |
| Env* env_; |
| const std::string receiver_file_; |
| |
| // The fifo used to receieve messages from the subprocess. |
| std::unique_ptr<Fifo> receiver_fifo_; |
| |
| // The underlying subprocess. |
| std::shared_ptr<Subprocess> process_; |
| |
| // Protocol with which to send and receive bytes to and from 'process_'. |
| std::shared_ptr<SubprocessProtocol> message_protocol_; |
| |
| // Thread that runs the subprocess. Since the subprocess is run via |
| // fork/exec, this thread must stay alive for the lifetime of the server. |
| // Otherwise, the OS may silently kill the spawned child process. |
| scoped_refptr<Thread> start_thread_; |
| |
| // Pulls requests off the request queue and serializes them via the |
| // message protocol. |
| scoped_refptr<Thread> write_thread_; |
| |
| // Reads from the message protocol, constructs the response, and puts it on |
| // the response queue. |
| scoped_refptr<Thread> read_thread_; |
| |
| // Looks at the front of the queue for calls that are past their deadlines |
| // and triggers their callbacks. |
| scoped_refptr<Thread> deadline_checker_; |
| |
| // Pull work off the response queue and trigger the associated callbacks if |
| // appropriate. |
| std::vector<scoped_refptr<Thread>> responder_threads_; |
| |
| // Outbound queue of calls to send to the subprocess. |
| SubprocessCallQueue outbound_call_queue_; |
| |
| // Inbound queue of responses sent by the subprocess. |
| ResponseQueue inbound_response_queue_; |
| |
| // Metrics for this subprocess. |
| SubprocessMetrics metrics_; |
| |
| // Calls that are currently in-flight (the requests are being sent over the |
| // pipe or waiting for a response), ordered by ID. This ordering allows for |
| // lookup by ID, and gives us a rough way to get the calls with earliest |
| // start times which is useful for deadline-checking. |
| // |
| // Only a single thread may remove a given call; that thread must run the |
| // call's callback. |
| simple_spinlock in_flight_lock_; |
| std::map<CallId, std::shared_ptr<SubprocessCall>> call_by_id_; |
| }; |
| |
| } // namespace subprocess |
| } // namespace kudu |