| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License |
| |
| #ifdef __WINDOWS__ |
| // NOTE: This must be included before the OpenSSL headers as it includes |
| // `WinSock2.h` and `Windows.h` in the correct order. |
| #include <stout/windows.hpp> |
| #endif // __WINDOWS__ |
| |
| #include <openssl/bio.h> |
| #include <openssl/ssl.h> |
| #include <openssl/err.h> |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <queue> |
| |
| #include <boost/shared_array.hpp> |
| |
| #include <process/io.hpp> |
| #include <process/loop.hpp> |
| #include <process/owned.hpp> |
| #include <process/socket.hpp> |
| |
| #include <process/ssl/flags.hpp> |
| |
| #include <stout/net.hpp> |
| #include <stout/synchronized.hpp> |
| #include <stout/unimplemented.hpp> |
| #include <stout/unreachable.hpp> |
| |
| #include <stout/os/lseek.hpp> |
| |
| #include "openssl.hpp" |
| |
| #include "ssl/openssl_socket.hpp" |
| |
| using process::network::openssl::Mode; |
| |
| namespace process { |
| namespace network { |
| namespace internal { |
| |
| // Contains the state of a source/sink BIO wrapper for an ordinary socket. |
| struct SocketBIOData |
| { |
| // Socket associated with this BIO. |
| // OpenSSLSocketImpl retains ownership of the socket. |
| int_fd socket; |
| |
| // Stores the latest call to `BIO_write`. |
| struct SendRequest |
| { |
| SendRequest(Future<size_t> _future) |
| : future(_future) {} |
| |
| Future<size_t> future; |
| }; |
| |
| // Stores the latest call to `BIO_read`. |
| struct RecvRequest |
| { |
| RecvRequest( |
| char* _data, |
| size_t _size, |
| Future<size_t> _future) |
| : data(_data), |
| size(_size), |
| future(_future) {} |
| |
| char* data; // NOT owned by this object. |
| size_t size; |
| Future<size_t> future; |
| }; |
| |
| // Protects the following instance variables. |
| std::atomic_flag lock = ATOMIC_FLAG_INIT; |
| Owned<SendRequest> send_request; |
| Owned<RecvRequest> recv_request; |
| bool reached_eof; |
| }; |
| |
| |
| // Called in response to `BIO_new()`. |
| // We will need to perform some additional initialization to link |
| // the OpenSSLSocketImpl to the BIO, outside of this function. |
| // |
| // See: https://www.openssl.org/docs/man1.1.1/man3/BIO_get_data.html |
| int bio_libprocess_create(BIO* bio) |
| { |
| // Indicate that initialization has not completed. |
| BIO_set_init(bio, 0); |
| |
| // The caller will need to fill in the data with a copy of the `int_fd` |
| // associated with the OpenSSLSocketImpl, after BIO creation. |
| BIO_set_data(bio, new SocketBIOData()); |
| |
| return 1; |
| } |
| |
| |
| // Called in response to `BIO_free()`. |
| int bio_libprocess_destroy(BIO* bio) |
| { |
| SocketBIOData* data = reinterpret_cast<SocketBIOData*>(BIO_get_data(bio)); |
| CHECK_NOTNULL(data); |
| delete data; |
| |
| return 1; |
| } |
| |
| |
| // Called in response to `BIO_write()`. |
| // This function will maintain a single pending write at any time |
| // by swapping the contents of `send_request`. |
| int bio_libprocess_write(BIO* bio, const char* input, int length) |
| { |
| SocketBIOData* data = reinterpret_cast<SocketBIOData*>(BIO_get_data(bio)); |
| CHECK_NOTNULL(data); |
| |
| synchronized (data->lock) { |
| // Only a single write should be pending at any time. |
| if (data->send_request.get() == nullptr || |
| data->send_request->future.isReady()) { |
| Owned<SocketBIOData::SendRequest> request( |
| new SocketBIOData::SendRequest( |
| io::write(data->socket, input, length))); |
| |
| std::swap(request, data->send_request); |
| return length; |
| } |
| |
| BIO_set_retry_write(bio); |
| return 0; |
| } |
| } |
| |
| |
| // Called in response to `BIO_read()`. |
| // This function will maintain a single pending read at any time |
| // by swapping the contents of `recv_request`. |
| int bio_libprocess_read(BIO* bio, char* output, int length) |
| { |
| SocketBIOData* data = reinterpret_cast<SocketBIOData*>(BIO_get_data(bio)); |
| CHECK_NOTNULL(data); |
| |
| synchronized (data->lock) { |
| // Only a single read should be pending at any time. |
| if (data->recv_request.get() == nullptr) { |
| Owned<SocketBIOData::RecvRequest> request( |
| new SocketBIOData::RecvRequest( |
| output, length, io::read(data->socket, output, length))); |
| |
| std::swap(request, data->recv_request); |
| } else if (data->recv_request->future.isReady()) { |
| Owned<SocketBIOData::RecvRequest> completed_request; |
| std::swap(completed_request, data->recv_request); |
| |
| // When retrying a read, the arguments passed in must be identical |
| // to the previous attempt. This is an API requirement of `SSL_read`. |
| // See: https://www.openssl.org/docs/man1.1.1/man3/SSL_read.html |
| // "The calling process then must repeat the call after taking |
| // appropriate action to satisfy the needs of the read function." |
| // |
| // This guarantee means we can read onto the same buffer between retries, |
| // confident that the same output buffer will be allocated and available |
| // each time. |
| CHECK_EQ(completed_request->data, output); |
| CHECK_EQ(completed_request->size, length); |
| |
| if (completed_request->future.get() == 0u) { |
| data->reached_eof = true; |
| } |
| |
| return completed_request->future.get(); |
| } |
| |
| BIO_set_retry_read(bio); |
| return 0; |
| } |
| } |
| |
| |
| // Called in response to `BIO_ctrl()`, which is usually wrapped by |
| // different macros, i.e. `BIO_reset()`, `BIO_eof()`, `BIO_flush()`, etc. |
| // |
| // The enums implemented below were based on Libevent's BIO implementation |
| // and OpenSSL's BSS (BIO Source Sink) Socket, found here: |
| // https://github.com/openssl/openssl/blob/OpenSSL_1_1_1/crypto/bio/bss_sock.c |
| // |
| // See: https://www.openssl.org/docs/man1.1.1/man3/BIO_ctrl.html |
| long bio_libprocess_ctrl(BIO* bio, int command, long, void*) |
| { |
| SocketBIOData* data = reinterpret_cast<SocketBIOData*>(BIO_get_data(bio)); |
| CHECK_NOTNULL(data); |
| |
| switch (command) { |
| // Returns 1 when a read request has returned 0 bytes, |
| // and otherwise returns 0. |
| case BIO_CTRL_EOF: { |
| synchronized (data->lock) { |
| if (data->reached_eof) { |
| return 1; |
| } |
| |
| return 0; |
| } |
| } |
| |
| // NOTE: We choose not to implement BIO_CTRL_FLUSH because this call |
| // expects blocking behavior, and is sometimes called from within OpenSSL's |
| // library functions. When necessary, the retry-write behavior should be |
| // sufficient to make sure writes succeed. |
| case BIO_CTRL_FLUSH: { |
| // NOTE: We must return a successful result here, even though we |
| // have not flushed any data, because OpenSSL considers a failure |
| // here unrecoverable and will try to close the connection. |
| return 1; |
| } |
| |
| // NOTE: Libevent implements BIO_CTRL_GET_CLOSE and BIO_CTRL_SET_CLOSE, |
| // which indicates that the underlying I/O stream should be closed when |
| // the BIO is freed. We opt to always close/free the socket/BIO. |
| |
| // NOTE: We choose not to implement BIO_CTRL_PENDING and BIO_CTRL_WPENDING |
| // because this implementation only keeps a single read/write buffered |
| // at once. Also, these methods are not used by OpenSSL or by callers |
| // of our implementation. |
| |
| default: |
| return 0; // Not implemented. |
| } |
| } |
| |
| |
| // Constructs a new BIO_METHOD wrapping the libprocess event loop. |
| // |
| // See: https://www.openssl.org/docs/man1.1.1/man3/BIO_meth_new.html |
| static BIO_METHOD* libprocess_bio = nullptr; |
| static BIO_METHOD* get_libprocess_BIO_METHOD() |
| { |
| if (libprocess_bio != nullptr) { |
| return libprocess_bio; |
| } |
| |
| // Get a unique index for our new type, and annotate the index |
| // to say this BIO is a source/sink with a file descriptor. |
| int type = BIO_get_new_index(); |
| CHECK(type > 0) << "Failed to create a new BIO type"; |
| type = type|BIO_TYPE_SOURCE_SINK|BIO_TYPE_DESCRIPTOR; |
| |
| libprocess_bio = BIO_meth_new(type, "libprocess"); |
| |
| BIO_meth_set_create(libprocess_bio, bio_libprocess_create); |
| BIO_meth_set_destroy(libprocess_bio, bio_libprocess_destroy); |
| |
| BIO_meth_set_write(libprocess_bio, bio_libprocess_write); |
| BIO_meth_set_read(libprocess_bio, bio_libprocess_read); |
| |
| BIO_meth_set_ctrl(libprocess_bio, bio_libprocess_ctrl); |
| |
| return libprocess_bio; |
| } |
| |
| |
| // Constructs a new source/sink BIO around the given socket. |
| static BIO* BIO_new_libprocess(int_fd socket) |
| { |
| BIO* bio = BIO_new(get_libprocess_BIO_METHOD()); |
| CHECK_NOTNULL(bio); |
| |
| SocketBIOData* data = reinterpret_cast<SocketBIOData*>(BIO_get_data(bio)); |
| CHECK_NOTNULL(data); |
| |
| // Fill in the socket field of the new BIO. |
| data->socket = socket; |
| |
| BIO_set_init(bio, 1); |
| |
| return bio; |
| } |
| |
| |
| Try<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::create(int_fd socket) |
| { |
| openssl::initialize(); |
| |
| if (!openssl::flags().enabled) { |
| return Error("SSL is disabled"); |
| } |
| |
| return std::make_shared<OpenSSLSocketImpl>(socket); |
| } |
| |
| |
| OpenSSLSocketImpl::OpenSSLSocketImpl(int_fd socket) |
| : PollSocketImpl(socket), |
| ssl(nullptr), |
| dirty_shutdown(false) {} |
| |
| |
| OpenSSLSocketImpl::~OpenSSLSocketImpl() |
| { |
| if (ssl != nullptr) { |
| SSL_free(ssl); |
| ssl = nullptr; |
| } |
| |
| if (compute_thread.isSome()) { |
| process::terminate(compute_thread.get()); |
| compute_thread = None(); |
| } |
| } |
| |
| |
| Future<Nothing> OpenSSLSocketImpl::connect( |
| const Address& address) |
| { |
| LOG(FATAL) << "No TLS config was passed to a SSL socket."; |
| } |
| |
| |
| Future<Nothing> OpenSSLSocketImpl::connect( |
| const Address& address, |
| const openssl::TLSClientConfig& config) |
| { |
| if (client_config.isSome()) { |
| return Failure("Socket is already connecting or connected"); |
| } |
| |
| if (config.ctx == nullptr) { |
| return Failure("Invalid SSL context"); |
| } |
| |
| // NOTE: The OpenSSLSocketImpl destructor is responsible for calling |
| // `SSL_free` on this SSL object. |
| ssl = SSL_new(config.ctx); |
| if (ssl == nullptr) { |
| return Failure("Failed to connect: SSL_new"); |
| } |
| |
| client_config = config; |
| |
| if (config.configure_socket) { |
| Try<Nothing> configured = config.configure_socket( |
| ssl, address, config.servername); |
| |
| if (configured.isError()) { |
| return Failure("Failed to configure socket: " + configured.error()); |
| } |
| } |
| |
| // Set the SSL context in client mode. |
| SSL_set_connect_state(ssl); |
| |
| if (address.family() == Address::Family::INET4 || |
| address.family() == Address::Family::INET6) { |
| Try<inet::Address> inet_address = network::convert<inet::Address>(address); |
| |
| if (inet_address.isError()) { |
| return Failure("Failed to convert address: " + inet_address.error()); |
| } |
| |
| // Determine the 'peer_ip' from the address we're connecting to in |
| // order to properly verify the certificate later. |
| peer_ip = inet_address->ip; |
| } |
| |
| if (config.servername.isSome()) { |
| VLOG(2) << "Connecting to " << config.servername.get() << " at " << address; |
| } else { |
| VLOG(2) << "Connecting to " << address << " with no hostname specified"; |
| } |
| |
| // Hold a weak pointer since the connection (plus handshaking) |
| // might never complete. |
| std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this)); |
| |
| // Connect like a normal socket, then setup the I/O abstraction with OpenSSL |
| // and perform the TLS handshake. |
| return PollSocketImpl::connect(address) |
| .then([weak_self]() -> Future<size_t> { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| if (self == nullptr) { |
| return Failure("Socket destroyed while connecting"); |
| } |
| |
| return self->set_ssl_and_do_handshake(self->ssl); |
| }) |
| .then([weak_self]() -> Future<Nothing> { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| if (self == nullptr) { |
| return Failure("Socket destroyed while connecting"); |
| } |
| |
| // Time to do post-verification. |
| CHECK(self->client_config.isSome()); |
| |
| if (self->client_config->verify) { |
| Try<Nothing> verify = self->client_config->verify( |
| self->ssl, self->client_config->servername, self->peer_ip); |
| |
| if (verify.isError()) { |
| VLOG(1) << "Failed connect, verification error: " << verify.error(); |
| |
| return Failure(verify.error()); |
| } |
| } |
| |
| return Nothing(); |
| }); |
| } |
| |
| |
| Future<size_t> OpenSSLSocketImpl::recv(char* output, size_t size) |
| { |
| if (dirty_shutdown) { |
| return Failure("Socket is shutdown"); |
| } |
| |
| // Hold a weak pointer since the incoming socket is not guaranteed |
| // to terminate before the receiving end does. |
| std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this)); |
| |
| return process::loop( |
| compute_thread, |
| [weak_self, output, size]() -> Future<int> { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| if (self == nullptr) { |
| return Failure("Socket destroyed while receiving"); |
| } |
| |
| ERR_clear_error(); |
| return SSL_read(self->ssl, output, size); |
| }, |
| [weak_self](int result) -> Future<ControlFlow<size_t>> { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| if (self == nullptr) { |
| return Failure("Socket destroyed while receiving"); |
| } |
| |
| if (result == 0) { |
| // Check if EOF has been reached. |
| BIO* bio = SSL_get_rbio(self->ssl); |
| if (BIO_eof(bio) == 1) { |
| return Break(0u); |
| } |
| } |
| |
| return self->handle_ssl_return_result(result, true); |
| }); |
| } |
| |
| |
| Future<size_t> OpenSSLSocketImpl::send(const char* input, size_t size) |
| { |
| if (dirty_shutdown) { |
| return Failure("Socket is shutdown"); |
| } |
| |
| // Hold a weak pointer since a write may become backlogged indefinitely. |
| std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this)); |
| |
| return process::loop( |
| compute_thread, |
| [weak_self, input, size]() -> Future<int> { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| if (self == nullptr) { |
| return Failure("Socket destroyed while sending"); |
| } |
| |
| ERR_clear_error(); |
| return SSL_write(self->ssl, input, size); |
| }, |
| [weak_self](int result) -> Future<ControlFlow<size_t>> { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| if (self == nullptr) { |
| return Failure("Socket destroyed while sending"); |
| } |
| |
| return self->handle_ssl_return_result(result, false); |
| }); |
| } |
| |
| |
| Future<size_t> OpenSSLSocketImpl::sendfile( |
| int_fd fd, off_t offset, size_t size) |
| { |
| if (dirty_shutdown) { |
| return Failure("Socket is shutdown"); |
| } |
| |
| // Hold a weak pointer since both read and write are not guaranteed to finish. |
| std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this)); |
| |
| Try<off_t> seek = os::lseek(fd, offset, SEEK_SET); |
| if (seek.isError()) { |
| return Failure("Failed to seek: " + seek.error()); |
| } |
| |
| Try<Nothing> async = io::prepare_async(fd); |
| if (async.isError()) { |
| return Failure("Failed to make FD asynchronous: " + async.error()); |
| } |
| |
| size_t remaining_size = size; |
| boost::shared_array<char> data(new char[io::BUFFERED_READ_SIZE]); |
| |
| return process::loop( |
| compute_thread, |
| [weak_self, fd, remaining_size, data]() -> Future<size_t> { |
| return io::read( |
| fd, data.get(), std::min(io::BUFFERED_READ_SIZE, remaining_size)) |
| .then([weak_self, data](size_t read_bytes) -> Future<size_t> { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| if (self == nullptr) { |
| return Failure("Socket destroyed while sending file"); |
| } |
| |
| return self->send(data.get(), read_bytes); |
| }); |
| }, |
| [size, &remaining_size](size_t written) mutable |
| -> Future<ControlFlow<size_t>> { |
| remaining_size -= written; |
| |
| if (remaining_size > 0) { |
| return Continue(); |
| } |
| |
| return Break(size); |
| }); |
| } |
| |
| |
| Future<std::shared_ptr<SocketImpl>> OpenSSLSocketImpl::accept() |
| { |
| if (!accept_loop_started.once()) { |
| // Hold a weak pointer since we do not want this accept loop to extend |
| // the lifetime of `this` unnecessarily. |
| std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this)); |
| |
| // We start accepting incoming connections in a loop here because a socket |
| // must complete the SSL handshake (or be downgraded) before the socket is |
| // considered ready. In case the incoming socket never writes any data, |
| // we do not wait for the accept logic to complete before accepting a |
| // new socket. |
| process::loop( |
| compute_thread, |
| [weak_self]() -> Future<std::shared_ptr<SocketImpl>> { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| |
| if (self == nullptr) { |
| return Failure("Socket destructed"); |
| } |
| |
| return self->PollSocketImpl::accept(); |
| }, |
| [weak_self](const std::shared_ptr<SocketImpl>& socket) |
| -> Future<ControlFlow<Nothing>> { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| |
| if (self == nullptr) { |
| return Break(); |
| } |
| |
| // If we support downgrading the connection, first wait for this |
| // socket to become readable. We will then MSG_PEEK it to test |
| // whether we want to dispatch as SSL or non-SSL. |
| if (openssl::flags().support_downgrade) { |
| io::poll(socket->get(), process::io::READ) |
| .onReady([weak_self, socket]() { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| |
| if (self == nullptr) { |
| return; |
| } |
| |
| char data[6]; |
| |
| // Try to peek the first 6 bytes of the message. |
| ssize_t size = ::recv(socket->get(), data, 6, MSG_PEEK); |
| |
| // Based on the function 'ssl23_get_client_hello' in openssl, we |
| // test whether to dispatch to the SSL or non-SSL based accept |
| // based on the following rules: |
| // 1. If there are fewer than 3 bytes: non-SSL. |
| // 2. If the 1st bit of the 1st byte is set AND the 3rd byte |
| // is equal to SSL2_MT_CLIENT_HELLO: SSL. |
| // 3. If the 1st byte is equal to SSL3_RT_HANDSHAKE AND the |
| // 2nd byte is equal to SSL3_VERSION_MAJOR and the 6th byte |
| // is equal to SSL3_MT_CLIENT_HELLO: SSL. |
| // 4. Otherwise: non-SSL. |
| |
| // For an ascii based protocol to falsely get dispatched to SSL |
| // it needs to: |
| // 1. Start with an invalid ascii character (0x80). |
| // 2. OR have the first 2 characters be a SYN followed by ETX, |
| // and then the 6th character be SOH. |
| // These conditions clearly do not constitute valid HTTP |
| // requests, and are unlikely to collide with other existing |
| // protocols. |
| |
| bool ssl = false; // Default to rule 4. |
| |
| if (size < 2) { // Rule 1. |
| ssl = false; |
| } else if ((data[0] & 0x80) && |
| data[2] == SSL2_MT_CLIENT_HELLO) { // Rule 2. |
| ssl = true; |
| } else if (data[0] == SSL3_RT_HANDSHAKE && |
| data[1] == SSL3_VERSION_MAJOR && |
| data[5] == SSL3_MT_CLIENT_HELLO) { // Rule 3. |
| ssl = true; |
| } |
| |
| if (ssl) { |
| self->handle_accept_callback(socket); |
| } else { |
| self->accept_queue.put(socket); |
| } |
| }); |
| } else { |
| self->handle_accept_callback(socket); |
| } |
| |
| return Continue(); |
| }); |
| |
| accept_loop_started.done(); |
| } |
| |
| // NOTE: In order to not deadlock the libprocess socket manager, we must |
| // defer accepted sockets regardless of success or failure. This prevents |
| // the socket manager from recursively calling `on_accept` and deadlocking. |
| return accept_queue.get() |
| .repair(defer( |
| [](const Future<Future<std::shared_ptr<SocketImpl>>>& failure) { |
| return failure; |
| })) |
| .then(defer( |
| [](const Future<std::shared_ptr<SocketImpl>>& impl) |
| -> Future<std::shared_ptr<SocketImpl>> { |
| CHECK(!impl.isPending()); |
| return impl; |
| })); |
| } |
| |
| |
| Try<Nothing, SocketError> OpenSSLSocketImpl::shutdown(int how) |
| { |
| if (dirty_shutdown) { |
| return Nothing(); |
| } |
| |
| // Treat this as a dirty shutdown (i.e. closing the socket before sending |
| // the SSL close notification) because we are not guaranteed to properly |
| // shutdown synchronously. |
| dirty_shutdown = true; |
| |
| // Hold a weak pointer since we are ok with closing the socket before |
| // shutdown is completed gracefully. |
| std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this)); |
| |
| // The shutdown itself will happen asynchronously. |
| process::loop( |
| compute_thread, |
| [weak_self]() -> Future<int> { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| if (self == nullptr) { |
| return Failure("Socket destroyed while doing shutdown"); |
| } |
| |
| ERR_clear_error(); |
| return SSL_shutdown(self->ssl); |
| }, |
| [weak_self](int result) -> Future<ControlFlow<size_t>> { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| if (self == nullptr) { |
| return Failure("Socket destroyed while doing shutdown"); |
| } |
| |
| // A successful shutdown will return 0 if the close notification |
| // was sent, or 1 if both sides of the connection have closed. |
| // Either case is sufficient for a clean shutdown. |
| if (result >= 0) { |
| return Break(0u); |
| } |
| |
| // Check if EOF has been reached. |
| BIO* bio = SSL_get_rbio(self->ssl); |
| if (BIO_eof(bio) == 1) { |
| return Break(0u); |
| } |
| |
| return self->handle_ssl_return_result(result, false); |
| }); |
| |
| return Nothing(); |
| } |
| |
| |
| void OpenSSLSocketImpl::handle_accept_callback( |
| const std::shared_ptr<SocketImpl>& socket) |
| { |
| // Wrap this new socket up into our SSL wrapper class by releasing |
| // the FD and creating a new OpenSSLSocketImpl object with the FD. |
| const std::shared_ptr<OpenSSLSocketImpl> ssl_socket = |
| std::make_shared<OpenSSLSocketImpl>(socket->release()); |
| |
| // Set up SSL object. |
| SSL* accept_ssl = SSL_new(openssl::context()); |
| if (accept_ssl == nullptr) { |
| accept_queue.put(Failure("Accept failed, SSL_new")); |
| return; |
| } |
| |
| Try<Address> peer_address = network::peer(ssl_socket->get()); |
| if (!peer_address.isSome()) { |
| SSL_free(accept_ssl); |
| accept_queue.put( |
| Failure("Failed to determine peer IP: " + peer_address.error())); |
| return; |
| } |
| |
| // NOTE: Right now, `openssl::configure_socket` does not do anything |
| // in server mode, but we still pass the correct peer address to |
| // enable modules to implement application-level logic in the future. |
| Try<Nothing> configured = openssl::configure_socket( |
| accept_ssl, Mode::SERVER, peer_address.get(), None()); |
| |
| if (configured.isError()) { |
| SSL_free(accept_ssl); |
| accept_queue.put( |
| Failure("Failed to openssl::configure_socket for " + |
| stringify(*peer_address) + ": " + configured.error())); |
| return; |
| } |
| |
| // Set the SSL context in server mode. |
| SSL_set_accept_state(accept_ssl); |
| |
| // Hold a weak pointer since we do not want this accept function to extend |
| // the lifetime of `this` unnecessarily. |
| std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this)); |
| |
| // Pass ownership of `accept_ssl` to the newly accepted socket, |
| // and start the SSL handshake. When the SSL handshake completes, |
| // the listening socket will place the result (failure or success) |
| // onto the listening socket's `accept_queue`. |
| // |
| // TODO(josephw): Add a timeout to catch/close incoming sockets which |
| // never finish the SSL handshake. |
| ssl_socket->set_ssl_and_do_handshake(accept_ssl) |
| .onAny([weak_self, ssl_socket](Future<size_t> result) { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| |
| if (self == nullptr) { |
| return; |
| } |
| |
| // For verification purposes, we need to grab the address (again). |
| // We grab it up here (rather than down below) so that we can log |
| // it if the `result` is failed. |
| Try<Address> address = network::address(ssl_socket->get()); |
| |
| if (result.isFailed()) { |
| self->accept_queue.put( |
| Failure("Failed to SSL handshake" + |
| (address.isSome() ? " with " + stringify(*address) : "") + |
| ": " + result.failure())); |
| return; |
| } |
| |
| if (address.isError()) { |
| self->accept_queue.put( |
| Failure("Failed to determine peer IP: " + address.error())); |
| return; |
| } |
| |
| Try<inet::Address> inet_address = |
| network::convert<inet::Address>(address.get()); |
| |
| Try<Nothing> verify = openssl::verify( |
| ssl_socket->ssl, |
| Mode::SERVER, |
| None(), |
| inet_address.isSome() |
| ? Some(inet_address->ip) |
| : Option<net::IP>::none()); |
| |
| if (verify.isError()) { |
| VLOG(1) << "Failed accept for " << *address |
| << ", verification error: " << verify.error(); |
| |
| self->accept_queue.put(Failure(verify.error())); |
| return; |
| } |
| |
| self->accept_queue.put(ssl_socket); |
| }); |
| } |
| |
| |
| Future<size_t> OpenSSLSocketImpl::set_ssl_and_do_handshake(SSL* _ssl) |
| { |
| // NOTE: We would normally create this UPID in the socket's constructor. |
| // However, during libprocess initialization, the libprocess listening socket |
| // is constructed before spawning processes is allowed. |
| // This function is guaranteed to be called for any SSL socket that may |
| // transmit encrypted data. Listening sockets will not create a UPID. |
| if (compute_thread.isNone()) { |
| compute_thread = spawn(new ProcessBase(), true); |
| } |
| |
| // Save a reference to the SSL object. |
| ssl = _ssl; |
| |
| // Construct the BIO wrapper for the underlying socket. |
| // |
| // NOTE: This transfers ownership of the BIO to the SSL object. |
| // The BIO will be freed upon calling `SSL_free(ssl)`. |
| BIO* bio = BIO_new_libprocess(get()); |
| SSL_set_bio(ssl, bio, bio); |
| |
| // Hold a weak pointer since the handshake may potentially never complete. |
| std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this)); |
| |
| return process::loop( |
| compute_thread, |
| [weak_self]() -> Future<int> { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| if (self == nullptr) { |
| return Failure("Socket destroyed while doing handshake"); |
| } |
| |
| ERR_clear_error(); |
| return SSL_do_handshake(self->ssl); |
| }, |
| [weak_self](int result) -> Future<ControlFlow<size_t>> { |
| std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock()); |
| if (self == nullptr) { |
| return Failure("Socket destroyed while doing handshake"); |
| } |
| |
| // Check if EOF has been reached. |
| BIO* bio = SSL_get_rbio(self->ssl); |
| if (BIO_eof(bio) == 1) { |
| return Failure("EOF while doing handshake"); |
| } |
| |
| return self->handle_ssl_return_result(result, false); |
| }); |
| } |
| |
| |
| Future<ControlFlow<size_t>> OpenSSLSocketImpl::handle_ssl_return_result( |
| int result, |
| bool handle_as_read) |
| { |
| if (result > 0) { |
| // Successful result, potentially meaning a connected/accepted socket |
| // or a completed send/recv request. |
| return Break(result); |
| } |
| |
| // Not a success, so we'll need to have the BIO and associated data handy. |
| BIO* bio = SSL_get_rbio(ssl); |
| SocketBIOData* data = reinterpret_cast<SocketBIOData*>(BIO_get_data(bio)); |
| CHECK_NOTNULL(data); |
| |
| int error = SSL_get_error(ssl, result); |
| switch (error) { |
| case SSL_ERROR_WANT_READ: { |
| synchronized (data->lock) { |
| if (data->recv_request.get() != nullptr) { |
| return data->recv_request->future |
| .then([]() -> Future<ControlFlow<size_t>> { |
| return Continue(); |
| }); |
| } |
| } |
| |
| return Continue(); |
| } |
| case SSL_ERROR_WANT_WRITE: { |
| synchronized (data->lock) { |
| if (data->send_request.get() != nullptr) { |
| return data->send_request->future |
| .then([]() -> Future<ControlFlow<size_t>> { |
| return Continue(); |
| }); |
| } |
| } |
| |
| return Continue(); |
| } |
| case SSL_ERROR_WANT_CLIENT_HELLO_CB: |
| case SSL_ERROR_WANT_X509_LOOKUP: |
| return Failure("Not implemented"); |
| case SSL_ERROR_ZERO_RETURN: |
| if (handle_as_read) { |
| return Break(0u); |
| } else { |
| return Failure("TLS connection has been closed"); |
| } |
| case SSL_ERROR_WANT_ASYNC: |
| case SSL_ERROR_WANT_ASYNC_JOB: |
| // We do not use `SSL_MODE_ASYNC`. |
| case SSL_ERROR_WANT_CONNECT: |
| case SSL_ERROR_WANT_ACCEPT: |
| // We make sure the underlying socket is connected prior |
| // to any interaction with the OpenSSL library. |
| UNREACHABLE(); |
| case SSL_ERROR_SSL: |
| dirty_shutdown = true; |
| return Failure("Protocol error"); |
| case SSL_ERROR_SYSCALL: |
| dirty_shutdown = true; |
| |
| // NOTE: If there is an error (`ERR_peek_error() != 0`), |
| // we fall through to the default error handling case. |
| if (ERR_peek_error() == 0) { |
| return Failure("TCP connection closed before SSL termination"); |
| } |
| default: { |
| char buffer[1024] = {}; |
| std::string error_strings; |
| while ((error = ERR_get_error())) { |
| ERR_error_string_n(error, buffer, sizeof(buffer)); |
| error_strings += "\n" + stringify(buffer); |
| } |
| return Failure("Failed with error:" + error_strings); |
| } |
| }; |
| } |
| |
| } // namespace internal { |
| } // namespace network { |
| } // namespace process { |