blob: 251399e9ad641f7b0eabc8fc0ca42c237a42035b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <iostream>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <boost/algorithm/string.hpp>
#include <boost/thread/thread.hpp>
#include "gen-cpp/NetworkTest_types.h"
#include "gen-cpp/NetworkTestService.h"
#include "common/init.h"
#include "common/logging.h"
#include "util/cpu-info.h"
#include "util/stopwatch.h"
#include "rpc/thrift-client.h"
#include "rpc/thrift-server.h"
#include "rpc/thrift-thread.h"
#include "common/names.h"
DEFINE_int32(port, 22222, "Port for NetworkTestService");
DEFINE_int64(send_batch_size, 0, "Batch size (in bytes). Data is split up into batches");
// Simple client server network speed benchmark utility. This compiles to
// a binary that runs as both the client and server. The server can be started
// up by just running the binary. After the server starts up, it will drop into
// the client 'shell' where benchmarks can be run.
// The supported benchmarks are:
// 'send <size in mb> <target ip>'
// 'broadcast <size in mb> <list of space separated target ips>
// The command can also be passed in via command line in the same format. If
// run in this mode, the server does not start up.
// For broadcast, the data is sent in parallel to all nodes.
//
// The expected usage for measuring 'send' is to start up the server on one machine
// and issue the send from another.
// For 'broadcast', the server should be started on all the machines and then the
// broadcast is issued from one of them.
using boost::algorithm::is_any_of;
using boost::algorithm::token_compress_on;
using boost::algorithm::split;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::server;
using namespace apache::thrift::concurrency;
using namespace impala;
using namespace impalatest;
class TestServer : public NetworkTestServiceIf {
public:
TestServer() {
}
virtual ~TestServer() {
}
virtual void Send(ThriftDataResult& result, const ThriftDataParams& params) {
result.__set_bytes_received(params.data.size());
}
void Server(ThriftServer* server) {
ABORT_IF_ERROR(server->Start());
server->Join();
}
};
// Send bytes to client respecting the batch size
// Returns the rate in mb/s to send the data.
double Send(ThriftClient<NetworkTestServiceClient>* client, int64_t bytes) {
int64_t batch_size = FLAGS_send_batch_size;
if (batch_size == 0) batch_size = bytes;
int64_t total_sent = 0;
MonotonicStopWatch timer;
timer.Start();
while (total_sent < bytes) {
int64_t send_size = min(bytes - total_sent, batch_size);
total_sent += send_size;
ThriftDataParams data;
ThriftDataResult result;
data.data.resize(send_size);
client->iface()->Send(result, data);
if (result.bytes_received != send_size) {
return -1;
}
}
timer.Stop();
double mb = bytes / (1024. * 1024.);
double sec = timer.ElapsedTime() / (1000.) / (1000.) / (1000.);
return mb/sec;
}
// Send tokens[1] megabytes to tokens[2]
void HandleSend(const vector<string>& tokens) {
if (tokens.size() != 3) {
return;
}
int64_t mbs = atoi(tokens[1].c_str());
int64_t bytes = mbs * (1024L * 1024L);
cout << "Sending " << mbs << " megabytes..." << endl;
const string& ip = tokens[2];
ThriftClient<NetworkTestServiceClient> client(ip, FLAGS_port);
Status status = client.Open();
if (!status.ok()) {
cerr << "Could not connect to server" << endl;
return;
}
double rate = Send(&client, bytes);
if (rate < 0) {
cerr << "Send failed";
return;
}
cout << "Send rate: (MB/s): " << rate << endl;
}
// Broadcast tokens[1] megabytes to tokens[2...n] nodes in parallel.
void HandleBroadcast(const vector<string>& tokens) {
if (tokens.size() <= 2) {
return;
}
int64_t mbs = atoi(tokens[1].c_str());
int64_t bytes = mbs * (1024L * 1024L);
cout << "Broadcasting " << mbs << " megabytes..." << endl;
vector<ThriftClient<NetworkTestServiceClient>* > clients;
for (int i = 2; i < tokens.size(); ++i) {
ThriftClient<NetworkTestServiceClient>* client =
new ThriftClient<NetworkTestServiceClient>(tokens[i], FLAGS_port);
Status status = client->Open();
if (!status.ok()) {
cerr << "Could not connect to server: " << tokens[i] << endl;
return;
}
clients.push_back(client);
}
MonotonicStopWatch timer;
timer.Start();
thread_group threads;
for (int i = 0; i < clients.size(); ++i) {
threads.add_thread(new thread(Send, clients[i], bytes));
}
threads.join_all();
timer.Stop();
double mb = bytes / (1024 * 1024.);
double sec = timer.ElapsedTime() / (1000.) / (1000.) / (1000.);
cout << "Send rate per node: (MB/s) " << (mb/sec) << endl;
cout << "Send rate cluster: (MB/s) " << (mb * clients.size() / sec) << endl;
}
void ConvertToLowerCase(vector<string>* tokens) {
for (int i = 0; i < tokens->size(); ++i) {
transform(
(*tokens)[i].begin(), (*tokens)[i].end(), (*tokens)[i].begin(), ::tolower);
}
}
bool ProcessCommand(const vector<string>& tokens) {
if (tokens.empty()) return false;
if (tokens[0] == "quit") return true;;
if (tokens[0] == "send") {
HandleSend(tokens);
} else if (tokens[0] == "broadcast") {
HandleBroadcast(tokens);
} else {
cerr << "Invalid command" << endl;
return false;
}
return false;
}
int main(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
if (argc != 1) {
// Just run client from command line args
vector<string> tokens;
for (int i = 1; i < argc; ++i) {
tokens.push_back(argv[i]);
}
ConvertToLowerCase(&tokens);
ProcessCommand(tokens);
return 0;
}
// Start up server and client shell
boost::shared_ptr<TestServer> handler(new TestServer);
boost::shared_ptr<ThreadFactory> thread_factory(
new ThriftThreadFactory("test", "test"));
boost::shared_ptr<TProcessor> processor(new NetworkTestServiceProcessor(handler));
ThriftServer* server;
ABORT_IF_ERROR(ThriftServerBuilder("Network Test Server", processor, FLAGS_port)
.max_concurrent_connections(100)
.Build(&server));
thread* server_thread = new thread(&TestServer::Server, handler.get(), server);
string input;
while (1) {
vector<string> tokens;
cout << "> ";
cout.flush();
getline(cin, input);
if (cin.eof()) break;
split(tokens, input, is_any_of(" "), token_compress_on);
ConvertToLowerCase(&tokens);
if (ProcessCommand(tokens)) break;
}
server->StopForTesting();
server_thread->join();
return 0;
}