| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #ifndef IMPALA_RPC_THRIFT_SERVER_H |
| #define IMPALA_RPC_THRIFT_SERVER_H |
| |
| #include <boost/scoped_ptr.hpp> |
| #include <boost/shared_ptr.hpp> |
| #include <boost/thread/mutex.hpp> |
| #include <boost/unordered_map.hpp> |
| #include <boost/uuid/uuid_generators.hpp> |
| #include <thrift/TProcessor.h> |
| #include <thrift/server/TServer.h> |
| #include <thrift/transport/TSSLSocket.h> |
| |
| #include "common/status.h" |
| #include "util/metrics.h" |
| #include "util/thread.h" |
| |
| namespace impala { |
| |
| class AuthProvider; |
| |
| /// Utility class for all Thrift servers. Runs a TAcceptQueueServer server with, by |
| /// default, no enforced concurrent connection limit, that exposes the interface |
| /// described by a user-supplied TProcessor object. |
| /// |
| /// Use a ThriftServerBuilder to construct a ThriftServer. ThriftServer's c'tors are |
| /// private. |
| /// TODO: shutdown is buggy (which only harms tests) |
| class ThriftServer { |
| public: |
| /// Transport factory that procuess buffered transports with a customisable |
| /// buffer-size. A larger buffer is usually more efficient, as it allows the underlying |
| /// transports to perform fewer system calls. |
| class BufferedTransportFactory : |
| public apache::thrift::transport::TBufferedTransportFactory { |
| public: |
| static const int DEFAULT_BUFFER_SIZE_BYTES = 128 * 1024; |
| |
| BufferedTransportFactory(uint32_t buffer_size = DEFAULT_BUFFER_SIZE_BYTES) : |
| buffer_size_(buffer_size) { } |
| |
| virtual boost::shared_ptr<apache::thrift::transport::TTransport> getTransport( |
| boost::shared_ptr<apache::thrift::transport::TTransport> trans) { |
| return boost::shared_ptr<apache::thrift::transport::TTransport>( |
| new apache::thrift::transport::TBufferedTransport(trans, buffer_size_)); |
| } |
| private: |
| uint32_t buffer_size_; |
| }; |
| |
| /// Username. |
| typedef std::string Username; |
| |
| /// Per-connection information. |
| struct ConnectionContext { |
| TUniqueId connection_id; |
| Username username; |
| TNetworkAddress network_address; |
| std::string server_name; |
| }; |
| |
| /// Interface class for receiving connection creation / termination events. |
| class ConnectionHandlerIf { |
| public: |
| /// Called when a connection is established (when a client connects). |
| virtual void ConnectionStart(const ConnectionContext& connection_context) = 0; |
| |
| /// Called when a connection is terminated (when a client closes the connection). |
| /// After this callback returns, the memory connection_context references is no longer |
| /// valid and clients must not refer to it again. |
| virtual void ConnectionEnd(const ConnectionContext& connection_context) = 0; |
| |
| virtual ~ConnectionHandlerIf() = default; |
| }; |
| |
| int port() const { return port_; } |
| |
| bool ssl_enabled() const { return ssl_enabled_; } |
| |
| /// Blocks until the server stops and exits its main thread. |
| void Join(); |
| |
| /// FOR TESTING ONLY; stop the server and block until the server is stopped |
| void StopForTesting(); |
| |
| /// Starts the main server thread. Once this call returns, clients |
| /// may connect to this server and issue RPCs. May not be called more |
| /// than once. |
| Status Start(); |
| |
| /// Sets the connection handler which receives events when connections are created or |
| /// closed. |
| void SetConnectionHandler(ConnectionHandlerIf* connection) { |
| connection_handler_ = connection; |
| } |
| |
| /// Returns a unique identifier for the current connection. A connection is |
| /// identified with the lifetime of a socket connection to this server. |
| /// It is only safe to call this method during a Thrift processor RPC |
| /// implementation. Otherwise, the result of calling this method is undefined. |
| /// It is also only safe to reference the returned value during an RPC method. |
| static const TUniqueId& GetThreadConnectionId(); |
| |
| /// Returns a pointer to a struct that contains information about the current |
| /// connection. This includes: |
| /// - A unique identifier for the connection. |
| /// - The username provided by the underlying transport for the current connection, or |
| /// an empty string if the transport did not provide a username. Currently, only the |
| /// TSasl transport provides this information. |
| /// - The client connection network address. |
| /// It is only safe to call this method during a Thrift processor RPC |
| /// implementation. Otherwise, the result of calling this method is undefined. |
| /// It is also only safe to reference the returned value during an RPC method. |
| static const ConnectionContext* GetThreadConnectionContext(); |
| |
| private: |
| friend class ThriftServerBuilder; |
| |
| /// Creates, but does not start, a new server on the specified port |
| /// that exports the supplied interface. |
| /// - name: human-readable name of this server. Should not contain spaces |
| /// - processor: Thrift processor to handle RPCs |
| /// - port: The port the server will listen for connections on |
| /// - auth_provider: Authentication scheme to use. If nullptr, use the global default |
| /// demon<->demon provider. |
| /// - metrics: if not nullptr, the server will register metrics on this object |
| /// - max_concurrent_connections: The maximum number of concurrent connections allowed. |
| /// If 0, there will be no enforced limit on the number of concurrent connections. |
| ThriftServer(const std::string& name, |
| const boost::shared_ptr<apache::thrift::TProcessor>& processor, int port, |
| AuthProvider* auth_provider = nullptr, MetricGroup* metrics = nullptr, |
| int max_concurrent_connections = 0); |
| |
| /// Enables secure access over SSL. Must be called before Start(). The first three |
| /// arguments are the minimum SSL/TLS version, and paths to certificate and private key |
| /// files in .PEM format, respectively. If either file does not exist, an error is |
| /// returned. The fourth, optional, argument provides the command to run if a password |
| /// is required to decrypt the private key. It is invoked once, and the resulting |
| /// password is used only for password-protected .PEM files. The final argument is a |
| /// string containing a list of cipher suites, separated by commas, to enable. |
| Status EnableSsl(apache::thrift::transport::SSLProtocol version, |
| const std::string& certificate, const std::string& private_key, |
| const std::string& pem_password_cmd = "", const std::string& ciphers = ""); |
| |
| /// Creates the server socket on which this server listens. May be SSL enabled. Returns |
| /// OK unless there was a Thrift error. |
| Status CreateSocket( |
| boost::shared_ptr<apache::thrift::transport::TServerTransport>* socket); |
| |
| /// True if the server has been successfully started, for internal use only |
| bool started_; |
| |
| /// The port on which the server interface is exposed |
| int port_; |
| |
| /// True if the server socket only accepts SSL connections |
| bool ssl_enabled_; |
| |
| /// Path to certificate file in .PEM format |
| std::string certificate_path_; |
| |
| /// Path to private key file in .PEM format |
| std::string private_key_path_; |
| |
| /// Password string retrieved by running command in EnableSsl(). |
| std::string key_password_; |
| |
| /// List of ciphers that are ok for clients to use when connecting. |
| std::string cipher_list_; |
| |
| /// The SSL/TLS protocol client versions that this server will allow to connect. |
| apache::thrift::transport::SSLProtocol version_; |
| |
| /// Maximum number of concurrent connections (connections will block until fewer than |
| /// max_concurrent_connections_ are concurrently active). If 0, there is no enforced |
| /// limit. |
| int max_concurrent_connections_; |
| |
| /// User-specified identifier that shows up in logs |
| const std::string name_; |
| |
| /// Thread that runs ThriftServerEventProcessor::Supervise() in a separate loop |
| std::unique_ptr<Thread> server_thread_; |
| |
| /// Thrift housekeeping |
| boost::scoped_ptr<apache::thrift::server::TServer> server_; |
| boost::shared_ptr<apache::thrift::TProcessor> processor_; |
| |
| /// If not nullptr, called when connection events happen. Not owned by us. |
| ConnectionHandlerIf* connection_handler_; |
| |
| /// Protects connection_contexts_ |
| boost::mutex connection_contexts_lock_; |
| |
| /// Map of active connection context to a shared_ptr containing that context; when an |
| /// item is removed from the map, it is automatically freed. |
| typedef boost::unordered_map<ConnectionContext*, boost::shared_ptr<ConnectionContext>> |
| ConnectionContextSet; |
| ConnectionContextSet connection_contexts_; |
| |
| /// Metrics subsystem access |
| MetricGroup* metrics_; |
| |
| /// True if metrics are enabled |
| bool metrics_enabled_; |
| |
| /// Number of currently active connections |
| IntGauge* num_current_connections_metric_; |
| |
| /// Total connections made over the lifetime of this server |
| IntCounter* total_connections_metric_; |
| |
| /// Used to generate a unique connection id for every connection |
| boost::uuids::random_generator uuid_generator_; |
| |
| /// Not owned by us, owned by the AuthManager |
| AuthProvider* auth_provider_; |
| |
| /// Helper class which monitors starting servers. Needs access to internal members, and |
| /// is not used outside of this class. |
| class ThriftServerEventProcessor; |
| friend class ThriftServerEventProcessor; |
| }; |
| |
| /// Helper class to build new ThriftServer instances. |
| class ThriftServerBuilder { |
| public: |
| ThriftServerBuilder(const std::string& name, |
| const boost::shared_ptr<apache::thrift::TProcessor>& processor, int port) |
| : name_(name), processor_(processor), port_(port) {} |
| |
| /// Sets the auth provider for this server. Default is the system global auth provider. |
| ThriftServerBuilder& auth_provider(AuthProvider* provider) { |
| auth_provider_ = provider; |
| return *this; |
| } |
| |
| /// Sets the metrics instance that this server should register metrics with. Default is |
| /// nullptr. |
| ThriftServerBuilder& metrics(MetricGroup* metrics) { |
| metrics_ = metrics; |
| return *this; |
| } |
| |
| /// Sets the maximum concurrent thread count for this server. Default is 0, which means |
| /// there is no enforced limit. |
| ThriftServerBuilder& max_concurrent_connections(int max_concurrent_connections) { |
| max_concurrent_connections_ = max_concurrent_connections; |
| return *this; |
| } |
| |
| /// Enables SSL for this server. |
| ThriftServerBuilder& ssl( |
| const std::string& certificate, const std::string& private_key) { |
| enable_ssl_ = true; |
| certificate_ = certificate; |
| private_key_ = private_key; |
| return *this; |
| } |
| |
| /// Sets the SSL/TLS client version(s) that this server will allow to connect. |
| ThriftServerBuilder& ssl_version(apache::thrift::transport::SSLProtocol version) { |
| version_ = version; |
| return *this; |
| } |
| |
| /// Sets the command used to compute the password for the SSL private key. Default is |
| /// empty, i.e. no password needed. |
| ThriftServerBuilder& pem_password_cmd(const std::string& pem_password_cmd) { |
| pem_password_cmd_ = pem_password_cmd; |
| return *this; |
| } |
| |
| /// Sets the list of acceptable cipher suites for this server. Default is to use all |
| /// available system cipher suites. |
| ThriftServerBuilder& cipher_list(const std::string& ciphers) { |
| ciphers_ = ciphers; |
| return *this; |
| } |
| |
| /// Constructs a new ThriftServer and puts it in 'server', if construction was |
| /// successful, returns an error otherwise. In the error case, 'server' will not have |
| /// been set and will not need to be freed, otherwise the caller assumes ownership of |
| /// '*server'. |
| Status Build(ThriftServer** server) { |
| std::unique_ptr<ThriftServer> ptr(new ThriftServer(name_, processor_, port_, |
| auth_provider_, metrics_, max_concurrent_connections_)); |
| if (enable_ssl_) { |
| RETURN_IF_ERROR(ptr->EnableSsl( |
| version_, certificate_, private_key_, pem_password_cmd_, ciphers_)); |
| } |
| (*server) = ptr.release(); |
| return Status::OK(); |
| } |
| |
| private: |
| int max_concurrent_connections_ = 0; |
| std::string name_; |
| boost::shared_ptr<apache::thrift::TProcessor> processor_; |
| int port_ = 0; |
| |
| AuthProvider* auth_provider_ = nullptr; |
| MetricGroup* metrics_ = nullptr; |
| |
| bool enable_ssl_ = false; |
| apache::thrift::transport::SSLProtocol version_ = |
| apache::thrift::transport::SSLProtocol::TLSv1_0; |
| std::string certificate_; |
| std::string private_key_; |
| std::string pem_password_cmd_; |
| std::string ciphers_; |
| }; |
| |
| /// Contains a map from string for --ssl_minimum_version to Thrift's SSLProtocol. |
| struct SSLProtoVersions { |
| static std::map<std::string, apache::thrift::transport::SSLProtocol> PROTO_MAP; |
| |
| /// Given a string, find a corresponding SSLProtocol from PROTO_MAP. Returns an error if |
| /// one cannot be found. Matching is case-insensitive. |
| static Status StringToProtocol( |
| const std::string& in, apache::thrift::transport::SSLProtocol* protocol); |
| |
| /// Returns true if 'protocol' is supported by the version of OpenSSL this binary is |
| /// linked to. |
| static bool IsSupported(const apache::thrift::transport::SSLProtocol& protocol); |
| }; |
| |
| } |
| |
| #endif |