| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| // This file was copied from apache::thrift::server::TThreadedServer.cpp v0.9.0, with the |
| // significant changes noted inline below. |
| |
| #include "rpc/TAcceptQueueServer.h" |
| |
| #include <gutil/walltime.h> |
| #include <thrift/concurrency/PlatformThreadFactory.h> |
| #include <thrift/transport/TSocket.h> |
| |
| #include "util/histogram-metric.h" |
| #include "util/metrics.h" |
| #include "util/stopwatch.h" |
| #include "rpc/thrift-util.h" |
| #include "rpc/thrift-server.h" |
| #include "util/thread-pool.h" |
| |
| #include "common/names.h" |
| |
| DEFINE_int32(accepted_cnxn_queue_depth, 10000, |
| "(Advanced) The size of the post-accept, pre-setup connection queue in each thrift " |
| "server set up to service Impala internal and external connections."); |
| |
| DEFINE_int32(accepted_cnxn_setup_thread_pool_size, 2, |
| "(Advanced) The size of the thread pool that is used to process the " |
| "post-accept, pre-setup connection queue in each thrift server set up to service " |
| "Impala internal and external connections."); |
| |
| namespace apache { |
| namespace thrift { |
| namespace server { |
| |
| using boost::shared_ptr; |
| using namespace std; |
| using namespace apache::thrift; |
| using namespace apache::thrift::protocol; |
| using namespace apache::thrift::transport; |
| using namespace apache::thrift::concurrency; |
| using namespace impala; |
| |
| class TAcceptQueueServer::Task : public Runnable { |
| public: |
| Task(TAcceptQueueServer& server, shared_ptr<TProcessor> processor, |
| shared_ptr<TProtocol> input, shared_ptr<TProtocol> output, |
| shared_ptr<TTransport> transport) |
| : server_(server), |
| processor_(std::move(processor)), |
| input_(std::move(input)), |
| output_(std::move(output)), |
| transport_(std::move(transport)) {} |
| |
| ~Task() override = default; |
| |
| void run() override { |
| boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler(); |
| void* connectionContext = nullptr; |
| if (eventHandler != nullptr) { |
| connectionContext = eventHandler->createContext(input_, output_); |
| } |
| try { |
| for (;;) { |
| if (eventHandler != nullptr) { |
| eventHandler->processContext(connectionContext, transport_); |
| } |
| // Setting a socket timeout for process() may lead to false positive |
| // and prematurely closes a slow client's connection. |
| if (!processor_->process(input_, output_, connectionContext) || |
| !Peek(input_, connectionContext, eventHandler)) { |
| break; |
| } |
| } |
| } catch (const TTransportException& ttx) { |
| if (ttx.getType() != TTransportException::END_OF_FILE) { |
| string errStr = string("TAcceptQueueServer client died: ") + ttx.what(); |
| GlobalOutput(errStr.c_str()); |
| } |
| } catch (const std::exception& x) { |
| GlobalOutput.printf( |
| "TAcceptQueueServer exception: %s: %s", typeid(x).name(), x.what()); |
| } catch (...) { |
| GlobalOutput("TAcceptQueueServer uncaught exception."); |
| } |
| |
| try { |
| input_->getTransport()->close(); |
| } catch (const TTransportException& ttx) { |
| string errStr = string("TAcceptQueueServer input close failed: ") + ttx.what(); |
| GlobalOutput(errStr.c_str()); |
| } |
| try { |
| output_->getTransport()->close(); |
| } catch (const TTransportException& ttx) { |
| string errStr = string("TAcceptQueueServer output close failed: ") + ttx.what(); |
| GlobalOutput(errStr.c_str()); |
| } |
| |
| // Delete the context after closing the transports in case they have references to it. |
| if (eventHandler != nullptr) { |
| eventHandler->deleteContext(connectionContext, input_, output_); |
| } |
| |
| // Remove this task from parent bookkeeping |
| { |
| Synchronized s(server_.tasksMonitor_); |
| server_.tasks_.erase(this); |
| server_.tasksMonitor_.notify(); |
| } |
| } |
| |
| private: |
| |
| // This function blocks until some bytes show up from the client. |
| // Returns true if some bytes are available from client; |
| // Returns false upon reading EOF, in which case the connection |
| // will be closed by the caller. |
| // |
| // If idle_poll_period_ms_ is not 0, this function will block up |
| // to idle_poll_period_ms_ milliseconds before waking up to check |
| // if the sessions associated with the connection have all expired |
| // due to inactivity. If so, it will return false and the connection |
| // will be closed by the caller. |
| bool Peek(shared_ptr<TProtocol> input, void* connectionContext, |
| boost::shared_ptr<TServerEventHandler> eventHandler) { |
| // Set a timeout on input socket if idle_poll_period_ms_ is non-zero. |
| TSocket* socket = static_cast<TSocket*>(transport_.get()); |
| if (server_.idle_poll_period_ms_ > 0) { |
| socket->setRecvTimeout(server_.idle_poll_period_ms_); |
| } |
| |
| // Block until some bytes show up or EOF or timeout. |
| bool bytes_pending = true; |
| for (;;) { |
| try { |
| bytes_pending = input_->getTransport()->peek(); |
| break; |
| } catch (const TTransportException& ttx) { |
| // Implementaion of the underlying transport's peek() may call either |
| // read() or peek() of the socket. |
| if (eventHandler != nullptr && server_.idle_poll_period_ms_ > 0 && |
| (IsReadTimeoutTException(ttx) || IsPeekTimeoutTException(ttx))) { |
| ThriftServer::ThriftServerEventProcessor* thriftServerHandler = |
| static_cast<ThriftServer::ThriftServerEventProcessor*>(eventHandler.get()); |
| if (thriftServerHandler->IsIdleContext(connectionContext)) { |
| const string& client = socket->getSocketInfo(); |
| GlobalOutput.printf( |
| "TAcceptQueueServer closing connection to idle client %s", client.c_str()); |
| bytes_pending = false; |
| break; |
| } |
| } else { |
| // Rethrow the exception to be handled by callers. |
| throw; |
| } |
| } |
| } |
| // Unset the socket timeout. |
| if (server_.idle_poll_period_ms_ > 0) socket->setRecvTimeout(0); |
| return bytes_pending; |
| } |
| |
| TAcceptQueueServer& server_; |
| friend class TAcceptQueueServer; |
| |
| shared_ptr<TProcessor> processor_; |
| shared_ptr<TProtocol> input_; |
| shared_ptr<TProtocol> output_; |
| shared_ptr<TTransport> transport_; |
| }; |
| |
| TAcceptQueueServer::TAcceptQueueServer(const boost::shared_ptr<TProcessor>& processor, |
| const boost::shared_ptr<TServerTransport>& serverTransport, |
| const boost::shared_ptr<TTransportFactory>& transportFactory, |
| const boost::shared_ptr<TProtocolFactory>& protocolFactory, |
| const boost::shared_ptr<ThreadFactory>& threadFactory, const string& name, |
| int32_t maxTasks, int64_t queue_timeout_ms, int64_t idle_poll_period_ms) |
| : TServer(processor, serverTransport, transportFactory, protocolFactory), |
| threadFactory_(threadFactory), name_(name), maxTasks_(maxTasks), |
| queue_timeout_ms_(queue_timeout_ms), idle_poll_period_ms_(idle_poll_period_ms) { |
| init(); |
| } |
| |
| void TAcceptQueueServer::init() { |
| if (!threadFactory_) { |
| threadFactory_.reset(new PlatformThreadFactory); |
| } |
| } |
| |
| void TAcceptQueueServer::CleanupAndClose(const string& error, |
| shared_ptr<TTransport> input, shared_ptr<TTransport> output, |
| shared_ptr<TTransport> client) { |
| if (input != nullptr) { |
| input->close(); |
| } |
| if (output != nullptr) { |
| output->close(); |
| } |
| if (client != nullptr) { |
| client->close(); |
| } |
| GlobalOutput(error.c_str()); |
| } |
| |
| // New. |
| void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) { |
| if (metrics_enabled_) queue_size_metric_->Increment(-1); |
| shared_ptr<TTransport> inputTransport; |
| shared_ptr<TTransport> outputTransport; |
| shared_ptr<TTransport> client = entry->client_; |
| const string& socket_info = reinterpret_cast<TSocket*>(client.get())->getSocketInfo(); |
| VLOG(1) << Substitute("TAcceptQueueServer: $0 started connection setup for client $1", |
| name_, socket_info); |
| try { |
| MonotonicStopWatch timer; |
| // Start timing for connection setup. |
| timer.Start(); |
| inputTransport = inputTransportFactory_->getTransport(client); |
| outputTransport = outputTransportFactory_->getTransport(client); |
| shared_ptr<TProtocol> inputProtocol = |
| inputProtocolFactory_->getProtocol(inputTransport); |
| shared_ptr<TProtocol> outputProtocol = |
| outputProtocolFactory_->getProtocol(outputTransport); |
| shared_ptr<TProcessor> processor = |
| getProcessor(inputProtocol, outputProtocol, client); |
| |
| if (metrics_enabled_) { |
| cnxns_setup_time_us_metric_->Update(timer.ElapsedTime() / NANOS_PER_MICRO); |
| } |
| VLOG(1) << Substitute("TAcceptQueueServer: $0 finished connection setup for " |
| "client $1", name_, socket_info); |
| |
| TAcceptQueueServer::Task* task = new TAcceptQueueServer::Task( |
| *this, processor, inputProtocol, outputProtocol, client); |
| |
| // Create a task |
| shared_ptr<Runnable> runnable = shared_ptr<Runnable>(task); |
| |
| // Create a thread for this task |
| shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(runnable)); |
| |
| // Insert thread into the set of threads. |
| // Start timing the wait duration for service threads. |
| timer.Reset(); |
| { |
| Synchronized s(tasksMonitor_); |
| int64_t wait_time_ms = 0; |
| |
| while (maxTasks_ > 0 && tasks_.size() >= maxTasks_) { |
| if (entry->expiration_time_ != 0) { |
| // We don't want wait_time to 'accidentally' go non-positive, |
| // so wait for at least 1ms. |
| wait_time_ms = std::max(1L, entry->expiration_time_ - MonotonicMillis()); |
| } |
| LOG_EVERY_N(INFO, 10) << name_ <<": All " << maxTasks_ |
| << " server threads are in use. " |
| << "Waiting for " << wait_time_ms << " milliseconds."; |
| int wait_result = tasksMonitor_.waitForTimeRelative(wait_time_ms); |
| if (wait_result == THRIFT_ETIMEDOUT) { |
| if (metrics_enabled_) { |
| thread_wait_time_us_metric_->Update(timer.ElapsedTime() / NANOS_PER_MICRO); |
| timedout_cnxns_metric_->Increment(1); |
| } |
| LOG(INFO) << name_ << ": Server busy. Timing out connection request."; |
| string errStr = "TAcceptQueueServer: " + name_ + " server busy"; |
| CleanupAndClose(errStr, inputTransport, outputTransport, client); |
| return; |
| } |
| } |
| tasks_.insert(task); |
| } |
| if (metrics_enabled_) { |
| thread_wait_time_us_metric_->Update(timer.ElapsedTime() / NANOS_PER_MICRO); |
| } |
| |
| // Start the thread! |
| thread->start(); |
| } catch (const TException& tx) { |
| string errStr = Substitute("TAcceptQueueServer: $0 connection setup failed for " |
| "client $1. Caught TException: $2", name_, socket_info, string(tx.what())); |
| CleanupAndClose(errStr, inputTransport, outputTransport, client); |
| } catch (const string& s) { |
| string errStr = Substitute("TAcceptQueueServer: $0 connection setup failed for " |
| "client $1. Unknown exception: $2", name_, socket_info, s); |
| CleanupAndClose(errStr, inputTransport, outputTransport, client); |
| } |
| } |
| |
| void TAcceptQueueServer::serve() { |
| // Start the server listening |
| serverTransport_->listen(); |
| |
| // Run the preServe event |
| if (eventHandler_ != nullptr) { |
| eventHandler_->preServe(); |
| } |
| |
| if (FLAGS_accepted_cnxn_setup_thread_pool_size > 1) { |
| LOG(INFO) << "connection_setup_thread_pool_size is set to " |
| << FLAGS_accepted_cnxn_setup_thread_pool_size; |
| } |
| |
| // New - this is the thread pool used to process the internal accept queue. |
| ThreadPool<shared_ptr<TAcceptQueueEntry>> connection_setup_pool("setup-server", |
| "setup-worker", FLAGS_accepted_cnxn_setup_thread_pool_size, |
| FLAGS_accepted_cnxn_queue_depth, |
| [this](int tid, const shared_ptr<TAcceptQueueEntry>& item) { |
| this->SetupConnection(item); |
| }); |
| // Initialize the thread pool |
| Status status = connection_setup_pool.Init(); |
| if (!status.ok()) { |
| status.AddDetail("TAcceptQueueServer: thread pool could not start."); |
| string errStr = status.GetDetail(); |
| GlobalOutput(errStr.c_str()); |
| stop_ = true; |
| } |
| |
| while (!stop_) { |
| try { |
| // Fetch client from server |
| shared_ptr<TTransport> client = serverTransport_->accept(); |
| |
| TSocket* socket = reinterpret_cast<TSocket*>(client.get()); |
| VLOG(1) << Substitute("New connection to server $0 from client $1", |
| name_, socket->getSocketInfo()); |
| |
| shared_ptr<TAcceptQueueEntry> entry{new TAcceptQueueEntry}; |
| entry->client_ = client; |
| if (queue_timeout_ms_ > 0) { |
| entry->expiration_time_ = MonotonicMillis() + queue_timeout_ms_; |
| } |
| |
| // New - the work done to set up the connection has been moved to SetupConnection. |
| // Note that we move() entry so it's owned by SetupConnection thread. |
| if (!connection_setup_pool.Offer(std::move(entry))) { |
| string errStr = string("TAcceptQueueServer: thread pool unexpectedly shut down."); |
| GlobalOutput(errStr.c_str()); |
| stop_ = true; |
| break; |
| } |
| if (metrics_enabled_) queue_size_metric_->Increment(1); |
| } catch (const TTransportException& ttx) { |
| if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) { |
| string errStr = |
| string("TAcceptQueueServer: TServerTransport died on accept: ") + ttx.what(); |
| GlobalOutput(errStr.c_str()); |
| } |
| continue; |
| } catch (const TException& tx) { |
| string errStr = string("TAcceptQueueServer: Caught TException: ") + tx.what(); |
| GlobalOutput(errStr.c_str()); |
| continue; |
| } catch (const string& s) { |
| string errStr = "TAcceptQueueServer: Unknown exception: " + s; |
| GlobalOutput(errStr.c_str()); |
| break; |
| } |
| } |
| |
| // If stopped manually, make sure to close server transport |
| if (stop_) { |
| try { |
| serverTransport_->close(); |
| connection_setup_pool.Shutdown(); |
| } catch (TException& tx) { |
| string errStr = string("TAcceptQueueServer: Exception shutting down: ") + tx.what(); |
| GlobalOutput(errStr.c_str()); |
| } |
| try { |
| Synchronized s(tasksMonitor_); |
| while (!tasks_.empty()) { |
| tasksMonitor_.wait(); |
| } |
| } catch (const TException& tx) { |
| string errStr = |
| string("TAcceptQueueServer: Exception joining workers: ") + tx.what(); |
| GlobalOutput(errStr.c_str()); |
| } |
| stop_ = false; |
| } |
| } |
| |
| void TAcceptQueueServer::InitMetrics(MetricGroup* metrics, const string& key_prefix) { |
| DCHECK(metrics != nullptr); |
| stringstream queue_size_ss; |
| queue_size_ss << key_prefix << ".connection-setup-queue-size"; |
| queue_size_metric_ = metrics->AddGauge(queue_size_ss.str(), 0); |
| stringstream timedout_cnxns_ss; |
| timedout_cnxns_ss << key_prefix << ".timedout-cnxn-requests"; |
| timedout_cnxns_metric_ = metrics->AddGauge(timedout_cnxns_ss.str(), 0); |
| const int max_histogram_value_us = 5 * 60 * MICROS_PER_SEC; |
| stringstream cnxns_setup_time_ss; |
| cnxns_setup_time_ss << key_prefix << ".connection-setup-time"; |
| cnxns_setup_time_us_metric_ = metrics->RegisterMetric(new HistogramMetric( |
| MetricDefs::Get(cnxns_setup_time_ss.str()), max_histogram_value_us, 1)); |
| stringstream thread_wait_time_ss; |
| thread_wait_time_ss << key_prefix << ".svc-thread-wait-time"; |
| thread_wait_time_us_metric_ = metrics->RegisterMetric(new HistogramMetric( |
| MetricDefs::Get(thread_wait_time_ss.str()), max_histogram_value_us, 1)); |
| metrics_enabled_ = true; |
| } |
| |
| } // namespace server |
| } // namespace thrift |
| } // namespace apache |