blob: 8c43a37677690edbf014db36a61f54dec68749d5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_RPC_THRIFT_SERVER_H
#define IMPALA_RPC_THRIFT_SERVER_H
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/unordered_map.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <thrift/TProcessor.h>
#include <thrift/server/TServer.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSSLSocket.h>
#include "common/status.h"
#include "util/metrics-fwd.h"
#include "util/thread.h"
namespace apache {
namespace thrift {
namespace protocol {
class TProtocol;
}
namespace server {
class TAcceptQueueServer;
}
}
}
namespace impala {
class AuthProvider;
/// Utility class for all Thrift servers. Runs a TAcceptQueueServer server with, by
/// default, no enforced concurrent connection limit, that exposes the interface
/// described by a user-supplied TProcessor object.
///
/// Use a ThriftServerBuilder to construct a ThriftServer. ThriftServer's c'tors are
/// private.
/// TODO: shutdown is buggy (which only harms tests)
class ThriftServer {
public:
/// Transport factory that wraps transports in a buffered transport with a customisable
/// buffer-size and optionally in another transport from a provided factory. A larger
/// buffer is usually more efficient, as it allows the underlying transports to perform
/// fewer system calls.
class BufferedTransportFactory :
public apache::thrift::transport::TBufferedTransportFactory {
public:
static const int DEFAULT_BUFFER_SIZE_BYTES = 128 * 1024;
BufferedTransportFactory(uint32_t buffer_size = DEFAULT_BUFFER_SIZE_BYTES,
apache::thrift::transport::TTransportFactory* wrapped_factory =
new apache::thrift::transport::TTransportFactory())
: buffer_size_(buffer_size), wrapped_factory_(wrapped_factory) {}
virtual boost::shared_ptr<apache::thrift::transport::TTransport> getTransport(
boost::shared_ptr<apache::thrift::transport::TTransport> trans) {
boost::shared_ptr<apache::thrift::transport::TTransport> wrapped =
wrapped_factory_->getTransport(trans);
return boost::shared_ptr<apache::thrift::transport::TTransport>(
new apache::thrift::transport::TBufferedTransport(wrapped, buffer_size_));
}
private:
uint32_t buffer_size_;
std::unique_ptr<apache::thrift::transport::TTransportFactory> wrapped_factory_;
};
/// Transport implementation used by the thrift server.
enum TransportType {
BINARY, // Thrift bytes over default transport.
HTTP, // Thrift bytes over HTTP transport.
};
/// Username.
typedef std::string Username;
/// Per-connection information.
struct ConnectionContext {
TUniqueId connection_id;
Username username;
Username do_as_user;
TNetworkAddress network_address;
std::string server_name;
/// Used to pass HTTP headers generated by the input transport to the output transport
/// to be returned.
std::vector<std::string> return_headers;
};
/// Interface class for receiving connection creation / termination events.
class ConnectionHandlerIf {
public:
/// Called when a connection is established (when a client connects).
virtual void ConnectionStart(const ConnectionContext& connection_context) = 0;
/// Called when a connection is terminated (when a client closes the connection).
/// After this callback returns, the memory connection_context references is no longer
/// valid and clients must not refer to it again.
virtual void ConnectionEnd(const ConnectionContext& connection_context) = 0;
/// Returns true if the connection is considered idle. A connection is considered
/// idle if all the sessions associated with it have expired due to idle timeout.
/// Called when a client has been inactive for --idle_client_poll_period_s seconds.
virtual bool IsIdleConnection(const ConnectionContext& connection_context) = 0;
virtual ~ConnectionHandlerIf() = default;
};
int port() const { return port_; }
bool ssl_enabled() const { return ssl_enabled_; }
/// Blocks until the server stops and exits its main thread.
void Join();
/// FOR TESTING ONLY; stop the server and block until the server is stopped
void StopForTesting();
/// Starts the main server thread. Once this call returns, clients
/// may connect to this server and issue RPCs. May not be called more
/// than once.
Status Start();
/// Sets the connection handler which receives events when connections are created or
/// closed.
void SetConnectionHandler(ConnectionHandlerIf* connection) {
connection_handler_ = connection;
}
/// Returns true if the current thread has a connection context set on it.
static bool HasThreadConnectionContext();
/// Returns a unique identifier for the current connection. A connection is
/// identified with the lifetime of a socket connection to this server.
/// It is only safe to call this method during a Thrift processor RPC
/// implementation. Otherwise, the result of calling this method is undefined.
/// It is also only safe to reference the returned value during an RPC method.
static const TUniqueId& GetThreadConnectionId();
/// Returns a pointer to a struct that contains information about the current
/// connection. This includes:
/// - A unique identifier for the connection.
/// - The username provided by the underlying transport for the current connection, or
/// an empty string if the transport did not provide a username. Currently, only the
/// TSasl transport provides this information.
/// - The client connection network address.
/// It is only safe to call this method during a Thrift processor RPC
/// implementation. Otherwise, the result of calling this method is undefined.
/// It is also only safe to reference the returned value during an RPC method.
static const ConnectionContext* GetThreadConnectionContext();
private:
friend class ThriftServerBuilder;
friend class apache::thrift::server::TAcceptQueueServer;
/// Helper class which monitors starting servers. Needs access to internal members, and
/// is not used outside of this class.
friend class ThriftServerEventProcessor;
/// Helper class that starts a server in a separate thread, and handles
/// the inter-thread communication to monitor whether it started
/// correctly.
class ThriftServerEventProcessor : public apache::thrift::server::TServerEventHandler {
public:
ThriftServerEventProcessor(ThriftServer* thrift_server)
: thrift_server_(thrift_server),
signal_fired_(false) { }
/// Called by the Thrift server implementation when it has acquired its resources and
/// is ready to serve, and signals to StartAndWaitForServer that start-up is finished.
/// From TServerEventHandler.
virtual void preServe();
/// Called when a client connects; we create per-client state and call any
/// ConnectionHandlerIf handler.
virtual void* createContext(
boost::shared_ptr<apache::thrift::protocol::TProtocol> input,
boost::shared_ptr<apache::thrift::protocol::TProtocol> output);
/// Called when a client starts an RPC; we set the thread-local connection context.
virtual void processContext(void* context,
boost::shared_ptr<apache::thrift::transport::TTransport> output);
/// Called when a client disconnects; we call any ConnectionHandlerIf handler.
virtual void deleteContext(void* context,
boost::shared_ptr<apache::thrift::protocol::TProtocol> input,
boost::shared_ptr<apache::thrift::protocol::TProtocol> output);
/// Returns true if a client's connection is idle. A client's connection is idle iff
/// all the sessions associated with it have expired due to idle timeout. Called from
/// TAcceptQueueServer::Task::run() after clients have been inactive for
/// --idle_client_poll_period_s seconds.
bool IsIdleContext(void* context);
/// Waits for a timeout of TIMEOUT_MS for a server to signal that it has started
/// correctly.
Status StartAndWaitForServer();
private:
/// Lock used to ensure that there are no missed notifications between starting the
/// supervision thread and calling signal_cond_.WaitUntil. Also used to ensure
/// thread-safe access to members of thrift_server_
boost::mutex signal_lock_;
/// Condition variable that is notified by the supervision thread once either
/// a) all is well or b) an error occurred.
ConditionVariable signal_cond_;
/// The ThriftServer under management. This class is a friend of ThriftServer, and
/// reaches in to change member variables at will.
ThriftServer* thrift_server_;
/// Guards against spurious condition variable wakeups
bool signal_fired_;
/// The time, in milliseconds, to wait for a server to come up
static const int TIMEOUT_MS = 2500;
/// Called in a separate thread
void Supervise();
};
/// Creates, but does not start, a new server on the specified port
/// that exports the supplied interface.
/// - name: human-readable name of this server. Should not contain spaces
/// - processor: Thrift processor to handle RPCs
/// - port: The port the server will listen for connections on
/// - auth_provider: Authentication scheme to use. If nullptr, use the global default
/// demon<->demon provider.
/// - metrics: if not nullptr, the server will register metrics on this object
/// - max_concurrent_connections: The maximum number of concurrent connections allowed.
/// If 0, there will be no enforced limit on the number of concurrent connections.
/// - queue_timeout_ms: amount of time in milliseconds an accepted client connection
/// will be held in the accepted queue, after which the request will be rejected if
/// a service thread can't be found. If 0, no timeout is enforced.
/// - idle_poll_period_ms: Amount of time, in milliseconds, of client's inactivity
/// before the service thread wakes up to check if the connection should be closed
/// due to inactivity. If 0, no polling happens.
ThriftServer(const std::string& name,
const boost::shared_ptr<apache::thrift::TProcessor>& processor, int port,
AuthProvider* auth_provider = nullptr, MetricGroup* metrics = nullptr,
int max_concurrent_connections = 0, int64_t queue_timeout_ms = 0,
int64_t idle_poll_period_ms = 0,
TransportType server_transport = TransportType::BINARY);
/// Enables secure access over SSL. Must be called before Start(). The first three
/// arguments are the minimum SSL/TLS version, and paths to certificate and private key
/// files in .PEM format, respectively. If either file does not exist, an error is
/// returned. The fourth, optional, argument provides the command to run if a password
/// is required to decrypt the private key. It is invoked once, and the resulting
/// password is used only for password-protected .PEM files. The final argument is a
/// string containing a list of cipher suites, separated by commas, to enable.
Status EnableSsl(apache::thrift::transport::SSLProtocol version,
const std::string& certificate, const std::string& private_key,
const std::string& pem_password_cmd = "", const std::string& ciphers = "");
/// Creates the server socket on which this server listens. May be SSL enabled. Returns
/// OK unless there was a Thrift error.
Status CreateSocket(boost::shared_ptr<apache::thrift::transport::TServerSocket>* socket);
/// True if the server has been successfully started, for internal use only
bool started_;
/// The port on which the server interface is exposed. Usually the port that was
/// passed to the constructor, but if this was the wildcard port 0, then this is
/// replaced with whatever port number the server is listening on.
int port_;
/// True if the server socket only accepts SSL connections
bool ssl_enabled_;
/// Path to certificate file in .PEM format
std::string certificate_path_;
/// Path to private key file in .PEM format
std::string private_key_path_;
/// Password string retrieved by running command in EnableSsl().
std::string key_password_;
/// List of ciphers that are ok for clients to use when connecting.
std::string cipher_list_;
/// The SSL/TLS protocol client versions that this server will allow to connect.
apache::thrift::transport::SSLProtocol version_;
/// Maximum number of concurrent connections (connections will block until fewer than
/// max_concurrent_connections_ are concurrently active). If 0, there is no enforced
/// limit.
int max_concurrent_connections_;
/// Amount of time in milliseconds an accepted client connection will be kept in the
/// accept queue before it is timed out. If 0, there is no timeout.
/// Used in TAcceptQueueServer.
int64_t queue_timeout_ms_;
/// Amount of time, in milliseconds, of client's inactivity before the service thread
/// wakes up to check if the connection should be closed due to inactivity. If 0, no
/// polling happens.
int64_t idle_poll_period_ms_;
/// User-specified identifier that shows up in logs
const std::string name_;
/// Identifier used to prefix all metric names that are produced by this server.
const std::string metrics_name_;
/// Thread that runs ThriftServerEventProcessor::Supervise() in a separate loop
std::unique_ptr<Thread> server_thread_;
/// Thrift housekeeping
boost::scoped_ptr<apache::thrift::server::TServer> server_;
boost::shared_ptr<apache::thrift::TProcessor> processor_;
/// If not nullptr, called when connection events happen. Not owned by us.
ConnectionHandlerIf* connection_handler_;
/// Protects connection_contexts_
boost::mutex connection_contexts_lock_;
/// Map of active connection context to a shared_ptr containing that context; when an
/// item is removed from the map, it is automatically freed.
typedef boost::unordered_map<ConnectionContext*, boost::shared_ptr<ConnectionContext>>
ConnectionContextSet;
ConnectionContextSet connection_contexts_;
/// Metrics subsystem access
MetricGroup* metrics_;
/// True if metrics are enabled
bool metrics_enabled_;
/// Number of currently active connections
IntGauge* num_current_connections_metric_;
/// Total connections made over the lifetime of this server
IntCounter* total_connections_metric_;
/// Used to generate a unique connection id for every connection
boost::uuids::random_generator uuid_generator_;
/// Not owned by us, owned by the AuthManager
AuthProvider* auth_provider_;
/// Underlying transport type used by this thrift server.
TransportType transport_type_;
};
/// Helper class to build new ThriftServer instances.
class ThriftServerBuilder {
public:
ThriftServerBuilder(const std::string& name,
const boost::shared_ptr<apache::thrift::TProcessor>& processor, int port)
: name_(name), processor_(processor), port_(port) {}
/// Sets the auth provider for this server. Default is the system global auth provider.
ThriftServerBuilder& auth_provider(AuthProvider* provider) {
auth_provider_ = provider;
return *this;
}
/// Sets the metrics instance that this server should register metrics with. Default is
/// nullptr.
ThriftServerBuilder& metrics(MetricGroup* metrics) {
metrics_ = metrics;
return *this;
}
/// Sets the maximum concurrent thread count for this server. Default is 0, which means
/// there is no enforced limit.
ThriftServerBuilder& max_concurrent_connections(int max_concurrent_connections) {
max_concurrent_connections_ = max_concurrent_connections;
return *this;
}
ThriftServerBuilder& queue_timeout_ms(int64_t timeout_ms) {
queue_timeout_ms_ = timeout_ms;
return *this;
}
ThriftServerBuilder& idle_poll_period_ms(int64_t timeout_ms) {
idle_poll_period_ms_ = timeout_ms;
return *this;
}
/// Enables SSL for this server.
ThriftServerBuilder& ssl(
const std::string& certificate, const std::string& private_key) {
enable_ssl_ = true;
certificate_ = certificate;
private_key_ = private_key;
return *this;
}
/// Sets the SSL/TLS client version(s) that this server will allow to connect.
ThriftServerBuilder& ssl_version(apache::thrift::transport::SSLProtocol version) {
version_ = version;
return *this;
}
/// Sets the command used to compute the password for the SSL private key. Default is
/// empty, i.e. no password needed.
ThriftServerBuilder& pem_password_cmd(const std::string& pem_password_cmd) {
pem_password_cmd_ = pem_password_cmd;
return *this;
}
/// Sets the list of acceptable cipher suites for this server. Default is to use all
/// available system cipher suites.
ThriftServerBuilder& cipher_list(const std::string& ciphers) {
ciphers_ = ciphers;
return *this;
}
/// Sets the underlying transport type for the thrift server.
ThriftServerBuilder& transport_type(ThriftServer::TransportType transport_type) {
server_transport_type_ = transport_type;
return *this;
}
/// Constructs a new ThriftServer and puts it in 'server', if construction was
/// successful, returns an error otherwise. In the error case, 'server' will not have
/// been set and will not need to be freed, otherwise the caller assumes ownership of
/// '*server'.
Status Build(ThriftServer** server) {
std::unique_ptr<ThriftServer> ptr(
new ThriftServer(name_, processor_, port_, auth_provider_, metrics_,
max_concurrent_connections_, queue_timeout_ms_, idle_poll_period_ms_,
server_transport_type_));
if (enable_ssl_) {
RETURN_IF_ERROR(ptr->EnableSsl(
version_, certificate_, private_key_, pem_password_cmd_, ciphers_));
}
(*server) = ptr.release();
return Status::OK();
}
private:
int64_t queue_timeout_ms_ = 0;
int64_t idle_poll_period_ms_ = 0;
int max_concurrent_connections_ = 0;
std::string name_;
boost::shared_ptr<apache::thrift::TProcessor> processor_;
int port_ = 0;
ThriftServer::TransportType server_transport_type_ =
ThriftServer::TransportType::BINARY;
AuthProvider* auth_provider_ = nullptr;
MetricGroup* metrics_ = nullptr;
bool enable_ssl_ = false;
apache::thrift::transport::SSLProtocol version_ =
apache::thrift::transport::SSLProtocol::TLSv1_0;
std::string certificate_;
std::string private_key_;
std::string pem_password_cmd_;
std::string ciphers_;
};
/// Contains a map from string for --ssl_minimum_version to Thrift's SSLProtocol.
struct SSLProtoVersions {
static std::map<std::string, apache::thrift::transport::SSLProtocol> PROTO_MAP;
/// Given a string, find a corresponding SSLProtocol from PROTO_MAP. Returns an error if
/// one cannot be found. Matching is case-insensitive.
static Status StringToProtocol(
const std::string& in, apache::thrift::transport::SSLProtocol* protocol);
/// Returns true if 'protocol' is supported by the version of OpenSSL this binary is
/// linked to.
static bool IsSupported(const apache::thrift::transport::SSLProtocol& protocol);
};
}
#endif