blob: 606a1c46e50936251c29af4b996007c480b2a135 [file] [log] [blame]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#include <memory>
#include <string>
#include <boost/shared_array.hpp>
#include <process/io.hpp>
#include <process/loop.hpp>
#include <process/network.hpp>
#include <process/owned.hpp>
#include <process/socket.hpp>
#include <process/ssl/flags.hpp>
#include <stout/os.hpp>
#include <stout/unreachable.hpp>
#ifdef USE_SSL_SOCKET
#include "posix/libevent/libevent_ssl_socket.hpp"
#endif
#include "poll_socket.hpp"
using std::string;
namespace process {
namespace network {
namespace internal {
Try<std::shared_ptr<SocketImpl>> SocketImpl::create(int_fd s, Kind kind)
{
switch (kind) {
case Kind::POLL:
return PollSocketImpl::create(s);
#ifdef USE_SSL_SOCKET
case Kind::SSL:
return LibeventSSLSocketImpl::create(s);
#endif
}
UNREACHABLE();
}
Try<std::shared_ptr<SocketImpl>> SocketImpl::create(
Address::Family family,
Kind kind)
{
int domain = [=]() {
switch (family) {
case Address::Family::INET4: return AF_INET;
case Address::Family::INET6: return AF_INET6;
#ifndef __WINDOWS__
case Address::Family::UNIX: return AF_UNIX;
#endif // __WINDOWS__
}
UNREACHABLE();
}();
// Supported in Linux >= 2.6.27.
#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
Try<int_fd> s =
network::socket(domain, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
if (s.isError()) {
return Error("Failed to create socket: " + s.error());
}
#else
Try<int_fd> s = network::socket(domain, SOCK_STREAM, 0);
if (s.isError()) {
return Error("Failed to create socket: " + s.error());
}
Try<Nothing> async = io::prepare_async(s.get());
if (async.isError()) {
os::close(s.get());
return Error("Failed to create socket, prepare_async: " + async.error());
}
Try<Nothing> cloexec = os::cloexec(s.get());
if (cloexec.isError()) {
os::close(s.get());
return Error("Failed to create socket, cloexec: " + cloexec.error());
}
#endif
Try<std::shared_ptr<SocketImpl>> impl = create(s.get(), kind);
if (impl.isError()) {
os::close(s.get());
}
return impl;
}
SocketImpl::Kind SocketImpl::DEFAULT_KIND()
{
// NOTE: Some tests may change the OpenSSL flags and reinitialize
// libprocess. In non-test code, the return value should be constant.
#ifdef USE_SSL_SOCKET
return network::openssl::flags().enabled ? Kind::SSL : Kind::POLL;
#else
return Kind::POLL;
#endif
}
Try<Address> SocketImpl::address() const
{
// TODO(benh): Cache this result so that we don't have to make
// unnecessary system calls each time.
return network::address(get());
}
Try<Address> SocketImpl::peer() const
{
// TODO(benh): Cache this result so that we don't have to make
// unnecessary system calls each time.
return network::peer(get());
}
Try<Address> SocketImpl::bind(const Address& address)
{
Try<Nothing> bind = network::bind(get(), address);
if (bind.isError()) {
return Error(bind.error());
}
// Lookup and store assigned IP and assigned port.
return network::address(get());
}
Future<string> SocketImpl::recv(const Option<ssize_t>& size)
{
// Extend lifetime by holding onto a reference to ourself!
auto self = shared_from_this();
// Default chunk size to attempt to receive when nothing is
// specified represents roughly 16 pages.
static const size_t DEFAULT_CHUNK = 16 * os::pagesize();
const size_t chunk = (size.isNone() || size.get() < 0)
? DEFAULT_CHUNK
: size.get();
boost::shared_array<char> data(new char[chunk]);
string buffer;
return loop(
None(),
[=]() {
return self->recv(data.get(), chunk);
},
[=](size_t length) mutable -> ControlFlow<string> {
if (length == 0) { // EOF.
// Return everything we've received thus far, a subsequent
// receive will return an empty string.
return Break(std::move(buffer));
}
buffer.append(data.get(), length);
if (size.isNone()) {
// We've been asked just to return any data that we receive!
return Break(std::move(buffer));
} else if (size.get() < 0) {
// We've been asked to receive until EOF so keep receiving
// since according to the 'length == 0' check above we
// haven't reached EOF yet.
return Continue();
} else if (
static_cast<string::size_type>(size.get()) > buffer.size()) {
// We've been asked to receive a particular amount of data and we
// haven't yet received that much data so keep receiving.
return Continue();
}
// We've received as much data as requested, so return that data!
return Break(std::move(buffer));
});
}
Future<Nothing> SocketImpl::send(const string& data)
{
// Extend lifetime by holding onto a reference to ourself!
auto self = shared_from_this();
// We need to share the `index` between both lambdas below.
std::shared_ptr<size_t> index(new size_t(0));
// We store `data.size()` so that we won't make a copy of `data` in
// each lambda below since some `data` might be very big!
const size_t size = data.size();
return loop(
None(),
[=]() {
return self->send(data.data() + *index, size - *index);
},
[=](size_t length) -> ControlFlow<Nothing> {
if ((*index += length) != size) {
return Continue();
}
return Break();
});
}
} // namespace internal {
} // namespace network {
} // namespace process {