blob: 61335f929f38032ecc3bd7d9b7d84e310d8c45d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// This file was copied from apache::thrift::server::TThreadedServer.cpp v0.9.0, with the
// significant changes noted inline below.
#ifndef IMPALA_RPC_TACCEPTQUEUESERVER_H
#define IMPALA_RPC_TACCEPTQUEUESERVER_H
#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/Thread.h>
#include <thrift/server/TServer.h>
#include <thrift/transport/TServerTransport.h>
#include <boost/shared_ptr.hpp>
#include "util/metrics.h"
namespace apache {
namespace thrift {
namespace server {
using apache::thrift::TProcessor;
using apache::thrift::transport::TServerTransport;
using apache::thrift::transport::TTransportFactory;
using apache::thrift::concurrency::Monitor;
using apache::thrift::concurrency::ThreadFactory;
/**
* In TAcceptQueueServer, the main server thread calls accept() and then immediately
* places the returned TTransport on a queue to be processed by a separate thread,
* asynchronously.
*
* This helps solve IMPALA-4135, where connections were timing out while waiting in the
* OS accept queue, by ensuring that accept() is called as quickly as possible.
*/
class TAcceptQueueServer : public TServer {
public:
class Task;
// TODO: Determine which c'tors are used and remove unused ones.
template <typename ProcessorFactory>
TAcceptQueueServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
int32_t maxTasks = 0,
THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
template <typename ProcessorFactory>
TAcceptQueueServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
const boost::shared_ptr<ThreadFactory>& threadFactory,
int32_t maxTasks = 0,
THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
template <typename Processor>
TAcceptQueueServer(const boost::shared_ptr<Processor>& processor,
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
int32_t maxTasks = 0,
THRIFT_OVERLOAD_IF(Processor, TProcessor));
template <typename Processor>
TAcceptQueueServer(const boost::shared_ptr<Processor>& processor,
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
const boost::shared_ptr<ThreadFactory>& threadFactory,
int32_t maxTasks = 0,
THRIFT_OVERLOAD_IF(Processor, TProcessor));
virtual ~TAcceptQueueServer();
virtual void serve();
void stop() {
stop_ = true;
serverTransport_->interrupt();
}
// New - Adds a metric for the size of the queue of connections waiting to be setup to
// the provided MetricGroup, prefixing its key with key_prefix.
void InitMetrics(impala::MetricGroup* metrics, const string& key_prefix);
protected:
void init();
// This is the work function for the thread pool, which does the work of setting up the
// connection and starting a thread to handle it. Will block if there are currently
// maxTasks_ connections and maxTasks_ is non-zero.
void SetupConnection(boost::shared_ptr<TTransport> client);
boost::shared_ptr<ThreadFactory> threadFactory_;
volatile bool stop_;
// Monitor protecting tasks_, notified on removal.
Monitor tasksMonitor_;
std::set<Task*> tasks_;
// The maximum number of running tasks allowed at a time.
const int32_t maxTasks_;
/// New - True if metrics are enabled
bool metrics_enabled_;
/// New - Number of connections that have been accepted and are waiting to be setup.
impala::IntGauge* queue_size_metric_;
};
template <typename ProcessorFactory>
TAcceptQueueServer::TAcceptQueueServer(
const boost::shared_ptr<ProcessorFactory>& processorFactory,
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
int32_t maxTasks,
THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
: TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
maxTasks_(maxTasks) {
init();
}
template <typename ProcessorFactory>
TAcceptQueueServer::TAcceptQueueServer(
const boost::shared_ptr<ProcessorFactory>& processorFactory,
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
const boost::shared_ptr<ThreadFactory>& threadFactory,
int32_t maxTasks,
THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
: TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
threadFactory_(threadFactory), maxTasks_(maxTasks) {
init();
}
template <typename Processor>
TAcceptQueueServer::TAcceptQueueServer(const boost::shared_ptr<Processor>& processor,
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
int32_t maxTasks,
THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
: TServer(processor, serverTransport, transportFactory, protocolFactory),
maxTasks_(maxTasks) {
init();
}
template <typename Processor>
TAcceptQueueServer::TAcceptQueueServer(const boost::shared_ptr<Processor>& processor,
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
const boost::shared_ptr<ThreadFactory>& threadFactory,
int32_t maxTasks,
THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
: TServer(processor, serverTransport, transportFactory, protocolFactory),
threadFactory_(threadFactory), maxTasks_(maxTasks) {
init();
}
} // namespace server
} // namespace thrift
} // namespace apache
#endif // #ifndef IMPALA_RPC_TACCEPTQUEUESERVER_H