blob: 70611c819e819d01f9245ff4972f94ce95f51ff4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef KUDU_SERVICE_POOL_H
#define KUDU_SERVICE_POOL_H
#include <string>
#include <vector>
#include "kudu/gutil/macros.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/rpc_service.h"
#include "kudu/rpc/service_queue.h"
#include "kudu/util/mutex.h"
#include "kudu/util/thread.h"
#include "kudu/util/status.h"
namespace kudu {
class Counter;
class Histogram;
class MetricEntity;
class Socket;
namespace rpc {
class Messenger;
class ServiceIf;
// A pool of threads that handle new incoming RPC calls.
// Also includes a queue that calls get pushed onto for handling by the pool.
class ServicePool : public RpcService {
public:
ServicePool(gscoped_ptr<ServiceIf> service,
const scoped_refptr<MetricEntity>& metric_entity,
size_t service_queue_length);
virtual ~ServicePool();
// Start up the thread pool.
virtual Status Init(int num_threads);
// Shut down the queue and the thread pool.
virtual void Shutdown();
RpcMethodInfo* LookupMethod(const RemoteMethod& method) override;
virtual Status QueueInboundCall(gscoped_ptr<InboundCall> call) OVERRIDE;
const Counter* RpcsTimedOutInQueueMetricForTests() const {
return rpcs_timed_out_in_queue_.get();
}
const Histogram* IncomingQueueTimeMetricForTests() const {
return incoming_queue_time_.get();
}
const Counter* RpcsQueueOverflowMetric() const {
return rpcs_queue_overflow_.get();
}
const std::string service_name() const;
private:
void RunThread();
void RejectTooBusy(InboundCall* c);
gscoped_ptr<ServiceIf> service_;
std::vector<scoped_refptr<kudu::Thread> > threads_;
LifoServiceQueue service_queue_;
scoped_refptr<Histogram> incoming_queue_time_;
scoped_refptr<Counter> rpcs_timed_out_in_queue_;
scoped_refptr<Counter> rpcs_queue_overflow_;
mutable Mutex shutdown_lock_;
bool closing_;
DISALLOW_COPY_AND_ASSIGN(ServicePool);
};
} // namespace rpc
} // namespace kudu
#endif