blob: e67f956fae0c2b5f945c423fc796d3f5ededa7dc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/util/net/socket.h"
#include <unistd.h>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/thread.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
using std::string;
namespace kudu {
constexpr size_t kEchoChunkSize = 32 * 1024 * 1024;
// A test scenario to make sure Sockaddr::HashCode() works as expected for
// addresses which are logically the same. In essence, the implementation of
// the Sockaddr class should zero out the zero padding field
// sockaddr_in::sin_zero to avoid issues related to not-initialized and former
// contents of the memory backing the zero padding field.
TEST(SockaddrHashTest, ZeroPadding) {
constexpr const char* const kIpAddr = "127.0.0.1";
constexpr const char* const kPath = "/tmp/some/long/enough/path/to.sock";
constexpr uint16_t kPort = 5678;
Sockaddr s_in;
ASSERT_OK(s_in.ParseString(kIpAddr, kPort));
Sockaddr s_un;
ASSERT_OK(s_un.ParseUnixDomainPath(kPath));
// Make 's_un' to be logically the same object as 's_in', but reusing the
// Sockaddr::storage_ field from its prior incarnation.
ASSERT_OK(s_un.ParseString(kIpAddr, kPort));
// The hash should be the same since 's_in' and 's_un' represent the same
// logical entity.
ASSERT_EQ(s_in.HashCode(), s_un.HashCode());
ASSERT_EQ(s_in, s_un);
Sockaddr s_in_0(s_in);
ASSERT_EQ(s_in.HashCode(), s_in_0.HashCode());
ASSERT_EQ(s_in, s_in_0);
Sockaddr s_in_1(s_un);
ASSERT_EQ(s_in.HashCode(), s_in_1.HashCode());
ASSERT_EQ(s_in, s_in_1);
Sockaddr s_un_0;
ASSERT_OK(s_un.ParseUnixDomainPath(kPath));
s_un_0 = s_in_0;
ASSERT_EQ(s_in.HashCode(), s_un_0.HashCode());
ASSERT_EQ(s_in_0, s_un_0);
Sockaddr s_un_1;
ASSERT_OK(s_un_1.ParseUnixDomainPath(kPath));
s_un_1 = s_in.ipv4_addr();
ASSERT_EQ(s_in.HashCode(), s_un_1.HashCode());
ASSERT_EQ(s_in, s_un_1);
}
class SocketTest : public KuduTest {
protected:
Socket listener_;
Sockaddr listen_addr_;
void BindAndListen(const string& addr_str) {
Sockaddr address;
ASSERT_OK(address.ParseString(addr_str, 0));
BindAndListen(address);
}
void BindAndListenUnix(const string& path) {
Sockaddr address;
ASSERT_OK(address.ParseUnixDomainPath(path));
BindAndListen(address);
}
void BindAndListen(const Sockaddr& address) {
CHECK_OK(listener_.Init(address.family(), 0));
CHECK_OK(listener_.BindAndListen(address, 0));
CHECK_OK(listener_.GetSocketAddress(&listen_addr_));
}
Socket ConnectToListeningServer() {
Socket client;
CHECK_OK(client.Init(listen_addr_.family(), 0));
CHECK_OK(client.Connect(listen_addr_));
CHECK_OK(client.SetRecvTimeout(MonoDelta::FromMilliseconds(100)));
return client;
}
void DoTestServerDisconnects(bool accept, const std::string &message) {
NO_FATALS(BindAndListen("0.0.0.0:0"));
CountDownLatch latch(1);
scoped_refptr<kudu::Thread> t;
Status status = kudu::Thread::Create("pool", "worker", ([&]{
if (accept) {
Sockaddr new_addr;
Socket sock;
CHECK_OK(listener_.Accept(&sock, &new_addr, 0));
CHECK_OK(sock.Close());
} else {
while (!latch.WaitFor(MonoDelta::FromMilliseconds(10))) {}
CHECK_OK(listener_.Close());
}
}), &t);
ASSERT_OK(status);
SCOPED_CLEANUP({
latch.CountDown();
if (t) {
t->Join();
}
});
Socket client = ConnectToListeningServer();
int n;
std::unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
const auto s = client.Recv(buf.get(), kEchoChunkSize, &n);
ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
ASSERT_STR_MATCHES(s.message().ToString(), message);
}
void DoUnixSocketTest(const string& path) {
const string kData = "hello world over a socket";
NO_FATALS(BindAndListenUnix(path));
std::thread t(
[&]{
Sockaddr new_addr;
Socket sock;
CHECK_OK(listener_.Accept(&sock, &new_addr, 0));
// Test GetPeerAddress from server side.
Sockaddr peer_addr;
CHECK_OK(sock.GetPeerAddress(&peer_addr));
CHECK(HasPrefixString(peer_addr.ToString(), "unix:"));
size_t n_written;
CHECK_OK(sock.BlockingWrite(
reinterpret_cast<const uint8_t*>(kData.data()), kData.size(), &n_written,
MonoTime::Now() + MonoDelta::FromSeconds(10)));
CHECK_OK(sock.Close());
});
auto cleanup = MakeScopedCleanup([&] { t.join(); });
Socket client = ConnectToListeningServer();
// Test GetPeerAddress from client side.
Sockaddr peer_addr;
ASSERT_OK(client.GetPeerAddress(&peer_addr));
EXPECT_EQ("unix:" + path, peer_addr.ToString());
size_t n;
char buf[kData.size()];
ASSERT_OK(client.BlockingRecv(reinterpret_cast<uint8_t*>(buf), kData.size(), &n,
MonoTime::Now() + MonoDelta::FromSeconds(5)));
cleanup.cancel();
t.join();
ASSERT_OK(client.Close());
ASSERT_EQ(n, kData.size());
ASSERT_EQ(string(buf, n), kData);
}
};
TEST_F(SocketTest, TestRecvReset) {
DoTestServerDisconnects(false, "recv error from 127.0.0.1:[0-9]+: "
"Resource temporarily unavailable");
}
TEST_F(SocketTest, TestRecvEOF) {
DoTestServerDisconnects(true, "recv got EOF from 127.0.0.1:[0-9]+");
}
// Apple does not support abstract namespaces in sockets.
#if !defined(__APPLE__)
TEST_F(SocketTest, TestUnixSocketAbstractNamespace) {
DoUnixSocketTest(strings::Substitute("@kudu-test-pid-$0", getpid()));
}
#endif
TEST_F(SocketTest, TestUnixSocketFilesystemPath) {
string path = GetTestSocketPath("socket-test");
SCOPED_CLEANUP({
unlink(path.c_str());
});
DoUnixSocketTest(path);
}
} // namespace kudu