blob: abe0efe740a3838294f2e6ec7dd172c8497e4dc8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include "gflags/gflags.h"
#include "grpc/grpc.h"
#include "grpc++/security/server_credentials.h"
#include "grpc++/server.h"
#include "grpc++/server_builder.h"
#include "tmb/internal/net_service_impl.h"
#include "tmb/message_bus.h"
// ----------------------------------------------------------------------------
// Shared routines for parsing addresses of upstream servers.
struct ServerAddress {
std::string hostname;
std::uint16_t port;
};
bool ParseServerAddresses(const std::string &address_string,
std::vector<ServerAddress> *addresses) {
std::size_t pos = 0;
while (pos < address_string.size()) {
std::size_t endpos = address_string.find(',', pos);
if (endpos == pos) {
// String included double-comma.
return false;
}
std::string host_string = address_string.substr(pos, endpos - pos);
std::size_t colon_pos = host_string.find(':');
if (colon_pos == 0) {
// Empty hostname.
return false;
}
if (colon_pos == std::string::npos) {
// Improperly formatted server string (no ":" character).
return false;
}
char *endptr = nullptr;
long parsed_port // NOLINT(runtime/int)
= std::strtol(host_string.c_str() + colon_pos + 1,
&endptr,
10);
if ((endptr == host_string.c_str() + colon_pos + 1) || (*endptr != '\0')) {
// Improperly formatted port number.
return false;
}
std::uint16_t port = parsed_port;
if (parsed_port != port) {
// Port number out of range.
return false;
}
ServerAddress addr;
addr.hostname = host_string.substr(0, colon_pos);
addr.port = port;
addresses->push_back(std::move(addr));
pos = (endpos == std::string::npos) ? endpos : endpos + 1;
}
return true;
}
#if defined(TMB_VOLTDB_ENABLED) || defined(TMB_ZOOKEEPER_ENABLED)
static bool ValidateServerAddresses(const char *flagname,
const std::string &value) {
// Just do an actual parse to validate the specified servers.
std::vector<ServerAddress> address_scratch;
if (ParseServerAddresses(value, &address_scratch)) {
return true;
}
std::fprintf(stderr,
"Invalid value for --%s: %s\n", flagname, value.c_str());
return false;
}
#endif // defined(TMB_VOLTDB_ENABLED) || defined(TMB_ZOOKEEPER_ENABLED)
// ----------------------------------------------------------------------------
// Conditionally-enabled includes and implementation-specific flags for various
// TMB implementations.
#if defined(TMB_LEVELDB_ENABLED) && defined(TMB_MEMORYMIRROR_ENABLED)
#include "tmb/leveldb_message_bus.h"
DEFINE_string(leveldb_path, "tmb_leveldb",
"[leveldb] Path for LevelDB database.");
DEFINE_bool(leveldb_sync_writes, true,
"[leveldb] Synchronously write records.");
#endif // TMB_LEVELDB_ENABLED && TMB_MEMORYMIRROR_ENABLED
#ifdef TMB_MEMORYMIRROR_ENABLED
#include "tmb/memory_mirror_message_bus.h"
DEFINE_bool(memory_mirror, true,
"Turn on memory mirroring (usually improves performance, but all "
"connections to the underlying bus must go through this server). "
"Has no effect if tmb_implementation is purememory or "
"nativelogging (effectively always on in such cases). This MUST "
"be turned on if tmb_implementation is leveldb or zookeeper.");
#endif // TMB_MEMORYMIRROR_ENABLED
#ifdef TMB_NATIVELOG_ENABLED
#include "tmb/native_logging_message_bus.h"
DEFINE_string(nativelog_filename, "tmb.log",
"[nativelog] Filename for native transaction log.");
DEFINE_bool(nativelog_sync_writes, true,
"[nativelog] Synchronously write log records.");
#endif // TMB_NATIVELOG_ENABLED
#ifdef TMB_PUREMEMORY_ENABLED
#include "tmb/pure_memory_message_bus.h"
#endif // TMB_PUREMEMORY_ENABLED
#ifdef TMB_SQLITE_ENABLED
#include "tmb/sqlite_message_bus.h"
DEFINE_string(sqlite_filename, "tmb.db",
"[sqlite] Filename for database.");
DEFINE_bool(sqlite_use_wal, true,
"[sqlite] Use write-ahead logging.");
#endif // TMB_SQLITE_ENABLED
#ifdef TMB_VOLTDB_ENABLED
#include "tmb/voltdb_message_bus.h"
DEFINE_string(voltdb_servers, "",
"[voltdb] A comma-separated list of VoltDB servers to connect "
"to, each of which is given in HOSTNAME:PORT or IP:PORT "
"format.");
static const bool voltdb_servers_dummy = gflags::RegisterFlagValidator(
&FLAGS_voltdb_servers,
&ValidateServerAddresses);
DEFINE_string(voltdb_username, "",
"[voltdb] Username for connecting to VoltDB servers (if blank, "
"will attempt to use unauthenticated connection).");
DEFINE_string(voltdb_password, "",
"[voltdb] Password for connecting to VoltDB servers.");
#endif // TMB_VOLTDB_ENABLED
#if defined(TMB_ZOOKEEPER_ENABLED) && defined(TMB_MEMORYMIRROR_ENABLED)
#include "tmb/zookeeper_message_bus.h"
DEFINE_string(zookeeper_servers, "",
"[zookeeper] A comma-separated list of Zookeeper servers to "
"connect to, each of which is given in HOSTNAME:PORT or IP:PORT "
"format.");
static const bool zookeeper_servers_dummy = gflags::RegisterFlagValidator(
&FLAGS_zookeeper_servers,
&ValidateServerAddresses);
DEFINE_string(zookeeper_path_prefix, "",
"[zookeeper] The prefix in the zookeeper pseudo-filesystem tree "
"to store TMB data under.");
#endif // TMB_ZOOKEEPER_ENABLED && TMB_MEMORYMIRROR_ENABLED
// ----------------------------------------------------------------------------
// Common flags for setting up server.
DEFINE_bool(enable_deletion, true,
"Enable separate deletion support for bus implementations where "
"it is optional (purememory, nativelog, or any bus with memory "
"mirroring enabled). For other busses, separate deletion support "
"is always available regardless of this flag.");
static bool ValidateTmbImplementation(const char *flagname,
const std::string &value) {
#if defined(TMB_LEVELDB_ENABLED) && defined(TMB_MEMORYMIRROR_ENABLED)
if (value == "leveldb") {
return true;
}
#endif
#ifdef TMB_NATIVELOG_ENABLED
if (value == "nativelog") {
return true;
}
#endif
#ifdef TMB_PUREMEMORY_ENABLED
if (value == "purememory") {
return true;
}
#endif // TMB_SQLITE_ENABLED
#ifdef TMB_SQLITE_ENABLED
if (value == "sqlite") {
return true;
}
#endif // TMB_SQLITE_ENABLED
#ifdef TMB_VOLTDB_ENABLED
if (value == "voltdb") {
return true;
}
#endif
#if defined(TMB_ZOOKEEPER_ENABLED) && defined(TMB_MEMORYMIRROR_ENABLED)
if (value == "zookeeper") {
return true;
}
#endif
std::fprintf(stderr,
"Invalid value for --%s: %s\n", flagname, value.c_str());
return false;
}
DEFINE_string(tmb_implementation, "purememory",
"Which underlying TMB implementation to use. Valid choices are "
"leveldb, nativelog, purememory, sqlite, voltdb, and zookeeper. "
"Depending on how the TMB library was built, some of these may "
"not be available.");
static const bool tmb_implementation_dummy = gflags::RegisterFlagValidator(
&FLAGS_tmb_implementation,
&ValidateTmbImplementation);
// NOTE(chasseur): Port 4575 is towards the middle of a decently-sized block of
// unassigned port numbers (as of 2015-04-21). If we do a public release of
// TMB, it might be worth trying to secure an IANA registration for a port.
DEFINE_string(server_address, "0.0.0.0:4575",
"The address and port to listen for connections on.");
DEFINE_bool(reset_bus, false,
"If true, reset the bus and clear any existing state on startup.");
// ----------------------------------------------------------------------------
// Function to actually create a MessageBus instance as specified by flags.
tmb::MessageBus* CreateMessageBus() {
#if defined(TMB_LEVELDB_ENABLED) && defined(TMB_MEMORYMIRROR_ENABLED)
if (FLAGS_tmb_implementation == "leveldb") {
return new tmb::LevelDBMessageBus(FLAGS_leveldb_path,
FLAGS_leveldb_sync_writes);
}
#endif
#ifdef TMB_NATIVELOG_ENABLED
if (FLAGS_tmb_implementation == "nativelog") {
if (FLAGS_enable_deletion) {
return new tmb::NativeLoggingMessageBus<true>(
FLAGS_nativelog_filename,
FLAGS_nativelog_sync_writes,
true);
} else {
return new tmb::NativeLoggingMessageBus<false>(
FLAGS_nativelog_filename,
FLAGS_nativelog_sync_writes,
true);
}
}
#endif
#ifdef TMB_PUREMEMORY_ENABLED
if (FLAGS_tmb_implementation == "purememory") {
if (FLAGS_enable_deletion) {
return new tmb::PureMemoryMessageBus<true>(true);
} else {
return new tmb::PureMemoryMessageBus<false>(true);
}
}
#endif
#ifdef TMB_SQLITE_ENABLED
if (FLAGS_tmb_implementation == "sqlite") {
return new tmb::SQLiteMessageBus(
FLAGS_sqlite_filename,
FLAGS_sqlite_use_wal);
}
#endif
#ifdef TMB_VOLTDB_ENABLED
if (FLAGS_tmb_implementation == "voltdb") {
tmb::VoltDBMessageBus *bus = FLAGS_voltdb_username.empty()
? new tmb::VoltDBMessageBus()
: new tmb::VoltDBMessageBus(FLAGS_voltdb_username,
FLAGS_voltdb_password);
std::vector<ServerAddress> addresses;
const bool parse_ok = ParseServerAddresses(FLAGS_voltdb_servers,
&addresses);
assert(parse_ok);
for (const ServerAddress &address : addresses) {
bus->AddServer(address.hostname, address.port);
}
return bus;
}
#endif
#if defined(TMB_ZOOKEEPER_ENABLED) && defined(TMB_MEMORYMIRROR_ENABLED)
if (FLAGS_tmb_implementation == "zookeeper") {
tmb::ZookeeperMessageBus *bus = new tmb::ZookeeperMessageBus(
FLAGS_zookeeper_path_prefix);
std::vector<ServerAddress> addresses;
const bool parse_ok = ParseServerAddresses(FLAGS_zookeeper_servers,
&addresses);
assert(parse_ok);
for (const ServerAddress &address : addresses) {
bus->AddServer(address.hostname, address.port);
}
return bus;
}
#endif
return nullptr;
}
// ----------------------------------------------------------------------------
// Functions to create a MemoryMirrorMessageBus as needed.
#ifdef TMB_MEMORYMIRROR_ENABLED
bool MemoryMirrorRequired() {
if (FLAGS_tmb_implementation == "leveldb"
|| FLAGS_tmb_implementation == "zookeeper") {
return true;
} else {
return false;
}
}
bool MemoryMirrorEnabled() {
if (FLAGS_tmb_implementation == "nativelog"
|| FLAGS_tmb_implementation == "purememory") {
return false;
} else {
return FLAGS_memory_mirror;
}
}
tmb::MessageBus* CreateMemoryMirror(tmb::MessageBus *underlying_bus) {
#ifdef TMB_LEVELDB_ENABLED
if (FLAGS_tmb_implementation == "leveldb") {
if (FLAGS_enable_deletion) {
return new tmb::MemoryMirrorMessageBus<true>(
static_cast<tmb::LevelDBMessageBus*>(underlying_bus),
true);
} else {
return new tmb::MemoryMirrorMessageBus<false>(
static_cast<tmb::LevelDBMessageBus*>(underlying_bus),
true);
}
}
#endif
#ifdef TMB_SQLITE_ENABLED
if (FLAGS_tmb_implementation == "sqlite") {
if (FLAGS_enable_deletion) {
return new tmb::MemoryMirrorMessageBus<true>(
static_cast<tmb::SQLiteMessageBus*>(underlying_bus),
true);
} else {
return new tmb::MemoryMirrorMessageBus<false>(
static_cast<tmb::SQLiteMessageBus*>(underlying_bus),
true);
}
}
#endif
#ifdef TMB_VOLTDB_ENABLED
if (FLAGS_tmb_implementation == "voltdb") {
if (FLAGS_enable_deletion) {
return new tmb::MemoryMirrorMessageBus<true>(
static_cast<tmb::VoltDBMessageBus*>(underlying_bus),
true);
} else {
return new tmb::MemoryMirrorMessageBus<false>(
static_cast<tmb::VoltDBMessageBus*>(underlying_bus),
true);
}
}
#endif
#ifdef TMB_ZOOKEEPER_ENABLED
if (FLAGS_tmb_implementation == "zookeeper") {
if (FLAGS_enable_deletion) {
return new tmb::MemoryMirrorMessageBus<true>(
static_cast<tmb::ZookeeperMessageBus*>(underlying_bus),
true);
} else {
return new tmb::MemoryMirrorMessageBus<false>(
static_cast<tmb::ZookeeperMessageBus*>(underlying_bus),
true);
}
}
#endif
return nullptr;
}
#endif // TMB_MEMORYMIRROR_ENABLED
// ----------------------------------------------------------------------------
// Server main()
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
if (argc != 1) {
std::fprintf(
stderr,
"Unrecognized command-line arguments. Try running %s --help\n",
argv[0]);
return 1;
}
#ifdef TMB_MEMORYMIRROR_ENABLED
if (MemoryMirrorRequired() && !MemoryMirrorEnabled()) {
std::fprintf(stderr,
"You have selected tmb_implementation %s, which requires "
"memory_mirror set to true\n",
FLAGS_tmb_implementation.c_str());
return 1;
}
#endif // TMB_MEMORYMIRROR_ENABLED
grpc_init();
std::printf("Creating message bus... ");
std::fflush(stdout);
tmb::MessageBus *bus = CreateMessageBus();
if (bus != nullptr) {
std::printf("OK\n");
} else {
std::printf("FAILED\n");
return 1;
}
std::printf("Initializing bus... ");
std::fflush(stdout);
if (bus->Initialize()) {
std::printf("OK\n");
} else {
std::printf("FAILED\n");
delete bus;
return 1;
}
if (FLAGS_reset_bus) {
std::printf("Resetting bus... ");
std::fflush(stdout);
bus->ResetBus();
std::printf("OK\n");
}
#ifdef TMB_MEMORYMIRROR_ENABLED
if (MemoryMirrorEnabled()) {
std::printf("Creating memory mirror... ");
std::fflush(stdout);
tmb::MessageBus *mirror = CreateMemoryMirror(bus);
if (mirror != nullptr) {
std::printf("OK\n");
bus = mirror;
} else {
std::printf("FAILED\n");
delete bus;
return 1;
}
std::printf("Initializing memory mirror (loading state)... ");
std::fflush(stdout);
if (bus->Initialize()) {
std::printf("OK\n");
} else {
std::printf("FAILED\n");
delete bus;
return 1;
}
}
#endif // TMB_MEMORYMIRROR_ENABLED
tmb::internal::NetServiceImpl tmb_service(bus);
// TODO(chasseur): Currently we just start the server with insecure
// credentials. We should add support for GRPC's built-in TLS encryption and
// authentication.
grpc::ServerBuilder builder;
builder.RegisterService(&tmb_service);
builder.AddListeningPort(FLAGS_server_address,
grpc::InsecureServerCredentials());
std::printf("Starting server on %s ... ", FLAGS_server_address.c_str());
std::fflush(stdout);
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
if (bus->Initialize()) {
std::printf("OK\n");
} else {
std::printf("FAILED\n");
return 1;
}
server->Wait();
grpc_shutdown();
return 0;
}