blob: 782f1e6c7c685393acd080c07432768a35878fe7 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <stdio.h>
#include <set>
#include <string>
#include <sstream>
#include <unistd.h>
#include <cstdlib>
#include <iostream>
#include <map>
#include <dirent.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/AsyncSession.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/SubscriptionManager.h>
using namespace qpid::client;
using namespace qpid::framing;
using namespace std;
namespace qpid {
namespace tests {
int
mrand ( int max_desired_val )
{
double zero_to_one = (double) rand() / (double) RAND_MAX;
return (int) (zero_to_one * (double) max_desired_val);
}
char *
file2str ( char const * file_name )
{
FILE * fp = fopen ( file_name, "r" );
if(! fp)
{
fprintf ( stderr, "file2str error: can't open file |%s|.\n", file_name );
return 0;
}
fseek ( fp, 0, SEEK_END );
size_t file_len = (size_t) ftell ( fp );
rewind ( fp );
char * content = (char *) malloc ( file_len + 1 );
if ( ! content )
{
fprintf ( stderr,
"file2str error: can't malloc %d bytes.\n",
(int)file_len
);
return 0;
}
size_t items_read = fread ( content, file_len, 1, fp );
if ( 1 != items_read )
{
fprintf ( stderr, "file2str error: read failed.\n" );
free ( content );
return 0;
}
fclose ( fp );
content[file_len] = 0;
return content;
}
class QrshServer : public MessageListener
{
public:
QrshServer ( SubscriptionManager & subscriptions,
char const * name,
char const * qrsh_run_path,
char const * host,
int port
);
virtual void received ( Message & message);
private:
set<string> all_server_names;
stringstream data_dir;
SubscriptionManager & subscriptions;
// Is this message addressed to me?
bool myMessage ( Message const & message );
/* ----------------------------------------------
* Special Commands
* These are commands that the qrsh_server executes
* directly, rather than through a child process
* instance of qrsh_run.
*/
void runCommand ( Message const & message );
void execute ( Message const & message );
void wait ( Message const & message );
void exited ( Message const & message );
void get ( Message const & message );
void rememberIntroduction ( Message const & message );
void getStraw ( Message const & message );
void addAlias ( Message const & message );
void start ( );
void sayHello ( );
void sayName ( );
// end Special Commands ------------------------
void saveCommand ( Message const & message );
void send ( string const & content );
void drawStraws ( );
void getNames ( );
void runSavedCommand ( );
char ** getArgs ( char const * s );
bool isProcessName ( char const * s );
int string_countWords ( char const * s );
char const * skipWord ( char const * s );
void string_replaceAll ( string & str,
string & target,
string & replacement
);
string name,
qrsh_run_path,
host;
vector<string *> aliases;
int port;
map < char *, int > abstract_name_map;
set < string > myFellowBrokers;
bool saidHello;
Message savedCommand;
vector < int > straws;
int myStraw;
};
QrshServer::QrshServer ( SubscriptionManager & subs,
char const * name,
char const * qrsh_run_path,
char const * host,
int port
)
: subscriptions ( subs ),
name ( name ),
qrsh_run_path ( qrsh_run_path ),
host ( host ),
port ( port ),
saidHello ( false ),
myStraw ( 0 )
{
data_dir << "/tmp/qrsh_"
<< getpid();
if(mkdir ( data_dir.str().c_str(), 0777 ) )
{
fprintf ( stderr,
"QrshServer::QrshServer error: can't mkdir |%s|\n",
data_dir.str().c_str()
);
exit ( 1 );
}
}
void
QrshServer::saveCommand ( Message const & message )
{
savedCommand = message;
}
void
QrshServer::runSavedCommand ( )
{
runCommand ( savedCommand );
}
void
QrshServer::start ( )
{
stringstream announcement_data;
announcement_data << "hello_my_name_is "
<< name;
send ( announcement_data.str() );
saidHello = true;
}
void
QrshServer::send ( string const & content )
{
try
{
Message message;
message.setData ( content );
Connection connection;
connection.open ( host, port );
Session session = connection.newSession ( );
session.messageTransfer ( arg::content = message,
arg::destination = "amq.fanout"
);
session.close();
connection.close();
}
catch ( exception const & e )
{
fprintf ( stderr, "QrshServer::send error: |%s|\n", e.what() );
}
}
void
QrshServer::sayHello ( )
{
if ( saidHello )
return;
stringstream ss;
ss << "hello_my_name_is "
<< name;
send ( ss.str() );
saidHello = true;
}
void
QrshServer::sayName ( )
{
fprintf ( stderr, "My name is: |%s|\n", name.c_str() );
}
void
QrshServer::drawStraws ( )
{
myStraw = mrand ( 1000000000 );
stringstream ss;
ss << "straw "
<< name
<< ' '
<< myStraw;
send ( ss.str() );
}
void
QrshServer::getStraw ( Message const & message )
{
int straw;
char brokerName[1000];
sscanf ( message.getData().c_str(), "%*s%s", brokerName );
if ( ! strcmp ( brokerName, name.c_str() ) )
return;
sscanf ( message.getData().c_str(), "%*s%*s%d", & straw );
straws.push_back ( straw );
bool i_win = true;
int ties = 0;
if ( straws.size() >= myFellowBrokers.size() )
{
// All votes are in! Let's see if I win!
for ( unsigned int i = 0; i < straws.size(); ++ i )
{
if ( straws[i] == myStraw )
++ ties;
else
if ( straws[i] > myStraw )
{
i_win = false;
break;
}
}
if ( i_win && (ties <= 0) )
{
myStraw = 0;
straws.clear();
runSavedCommand ( );
}
else
if ( i_win && (ties > 0) )
{
fprintf ( stderr, "MDEBUG oh no! drawStraws error: server %s tied with straw %d!\n", name.c_str(), straw );
}
}
}
/*
* "APB" command (all-points-bullitens (commands that are not addressed
* specifically to any server)) are handled directly, here.
* Because if I return simply "true", the normal command processing code
* will misinterpret the command.
*/
bool
QrshServer::myMessage ( Message const & message )
{
int const maxlen = 100;
char head[maxlen];
char first_word [ maxlen + 1 ];
strncpy ( head, message.getData().c_str(), maxlen );
sscanf ( head, "%s", first_word );
if ( ! strcmp ( name.c_str(), first_word ) )
{
return true;
}
else
{
// Is the given name one of my aliases?
char possibleAlias[1000];
if(1 == sscanf ( message.getData().c_str(), "%s", possibleAlias ))
{
for ( unsigned int i = 0; i < aliases.size(); ++ i )
{
if ( ! strcmp ( possibleAlias, aliases[i]->c_str() ))
{
return true;
}
}
}
}
if ( ! strcmp ( first_word, "hello_my_name_is" ) )
{
rememberIntroduction ( message );
sayHello ( );
return false;
}
else
if ( ! strcmp ( first_word, "straw" ) )
{
getStraw ( message );
return false;
}
else
if ( ! strcmp ( first_word, "all" ) )
{
return true;
}
else
if ( ! strcmp ( first_word, "any" ) )
{
straws.clear();
usleep ( 200000 );
saveCommand ( message );
drawStraws ( );
return false;
}
else
return false;
}
void
QrshServer::rememberIntroduction ( Message const & message )
{
char brokerName [ 1000 ];
sscanf ( message.getData().c_str(), "%*s%s", brokerName );
if ( strcmp ( brokerName, name.c_str() ) )
myFellowBrokers.insert ( string ( brokerName ) );
}
void
QrshServer::addAlias ( Message const & message )
{
char alias[1000];
sscanf ( message.getData().c_str(), "%*s%*s%s", alias );
aliases.push_back ( new string(alias) );
}
void
QrshServer::getNames ( )
{
abstract_name_map.clear();
DIR * dir = opendir ( data_dir.str().c_str() );
if ( ! dir )
{
fprintf ( stderr,
"QrshServer::getNames error: could not open dir |%s|.\n",
data_dir.str().c_str()
);
return;
}
struct dirent * file;
while ( (file = readdir ( dir ) ) )
{
if ( '.' != file->d_name[0] )
{
stringstream pid_file_name;
pid_file_name << data_dir.str()
<< '/'
<< file->d_name
<< "/pid";
int pid = 0;
FILE * fp;
if ( (fp = fopen ( pid_file_name.str().c_str(), "r" ) ) )
{
fscanf ( fp, "%d", & pid );
fclose ( fp );
abstract_name_map.insert(pair<char*, int>(strdup(file->d_name), pid));
}
else
{
/*
* Fail silently. The non-existence of this file
* is not necessarily an error.
*/
}
}
}
closedir ( dir );
}
void
QrshServer::string_replaceAll ( string & str,
string & target,
string & replacement
)
{
int target_size = target.size();
int found_pos = 0;
while ( 0 <= (found_pos = str.find ( target ) ) )
str.replace ( found_pos, target_size, replacement );
}
bool
QrshServer::isProcessName ( char const * str )
{
getNames();
map<char *, int>::iterator it;
for ( it = abstract_name_map.begin(); it != abstract_name_map.end(); ++ it )
{
if ( ! strcmp ( str, it->first ) )
return true;
}
return false;
}
int
QrshServer::string_countWords ( char const * s1 )
{
int count = 0;
char const * s2 = s1 + 1;
if ( ! isspace(* s1) )
{
++ count;
}
for ( ; * s2; ++ s1, ++ s2 )
{
// count space-to-word transitions.
if ( isspace(*s1) && (! isspace(*s2)) )
++ count;
}
return count;
}
void
QrshServer::execute ( Message const & message )
{
// First, gather all the symbolic names we know.
getNames();
// Now make a copy of the command, that I can alter.
string command ( message.getData() );
// Replace each occurrence of every abstract name with its pid.
char pid_str[100];
map<char *, int>::iterator it;
for ( it = abstract_name_map.begin(); it != abstract_name_map.end(); ++ it )
{
sprintf ( pid_str, "%d", it->second );
string target ( it->first ),
replacement ( pid_str );
string_replaceAll ( command, target, replacement );
}
char const * truncated_command = skipWord(skipWord(command.c_str()));
if ( truncated_command )
system ( truncated_command );
}
void
QrshServer::get ( Message const & request_message )
{
char * file_content;
/*
* Get the contents of the requested file.
*/
char file_or_process_name[1000];
sscanf ( request_message.getData().c_str(), "%*s%*s%s", file_or_process_name );
if ( isProcessName ( file_or_process_name ) )
{
stringstream desired_file_name;
desired_file_name << data_dir.str()
<< '/'
<< file_or_process_name
<< '/';
char requested_output_stream[1000];
if(1 != sscanf ( request_message.getData().c_str(),
"%*s%*s%*s%s",
requested_output_stream
)
)
{
fprintf ( stderr,
"QrshServer::get error: Can't read requested data file name from this message: |%s|\n",
request_message.getData().c_str()
);
return;
}
desired_file_name << requested_output_stream;
file_content = file2str ( desired_file_name.str().c_str() );
}
else
{
file_content = file2str ( file_or_process_name );
}
stringstream reply_data ;
reply_data << "get_response "
<< file_content;
/*
* Send a response-message to the server who is waiting.
*/
send ( reply_data.str() );
}
void
QrshServer::exited ( Message const & message )
{
int exit_code = -1;
// First, gather all the symbolic names we know.
getNames();
// Now make a copy of the command, that I can alter.
string edited_command ( message.getData() );
// Replace each occurrence of every abstract name with its pid.
char pid_str[100];
map<char *, int>::iterator it;
for ( it = abstract_name_map.begin(); it != abstract_name_map.end(); ++ it )
{
sprintf ( pid_str, "%d", it->second );
string target ( it->first ),
replacement ( pid_str );
string_replaceAll ( edited_command, target, replacement );
}
// Skip the service name. That is not used by the child.
char const * truncated_command = skipWord(edited_command.c_str());
if ( truncated_command )
{
stringstream ss;
ss << qrsh_run_path
<< ' '
<< data_dir.str()
<< ' '
<< truncated_command;
int child_pid;
if ( ! (child_pid = fork() ) )
{
// This is the child.
char ** argv = getArgs ( ss.str().c_str() );
execv ( qrsh_run_path.c_str(), argv );
perror ( "qrsh_server: execv error: " );
exit ( 1 );
}
else
{
// This is the parent.
pid_t awaited_pid;
while ( 0 == (awaited_pid = waitpid ( child_pid, & exit_code, WNOHANG)) )
{
fprintf ( stderr, "qrsh_server info: parent: waiting for child...\n" );
sleep(1);
}
if ( -1 == awaited_pid )
{
fprintf ( stderr, "qrsh_server error awaiting child!\n" );
exit ( 1 );
}
exit_code >>= 8;
stringstream data;
data << "wait_response "
<< exit_code;
send ( data.str() );
}
}
}
void
QrshServer::wait ( Message const & message )
{
bool pre_existing = false;
if ( 3 == string_countWords ( message.getData().c_str() ) )
{
// The first word is the name of this service.
// The second word is "exec_wait".
// The third word is the symbolic name of the command to wait for.
// The fact that there are exactly three words means that this
// must be a command that has already been named and started --
// we just need to find its pid and wait on it.
pre_existing = true;
}
int exit_code = -1;
// First, gather all the symbolic names we know.
getNames();
// Now make a copy of the command, that I can alter.
string edited_command ( message.getData() );
// Replace each occurrence of every abstract name with its pid.
char pid_str[100];
map<char *, int>::iterator it;
for ( it = abstract_name_map.begin(); it != abstract_name_map.end(); ++ it )
{
sprintf ( pid_str, "%d", it->second );
string target ( it->first ),
replacement ( pid_str );
string_replaceAll ( edited_command, target, replacement );
}
// Skip the service name. That is not used by the child.
char const * truncated_command = skipWord(edited_command.c_str());
if ( truncated_command )
{
stringstream ss;
ss << qrsh_run_path
<< ' '
<< data_dir.str()
<< ' '
<< truncated_command;
int child_pid;
if ( ! (child_pid = fork() ) )
{
// This is the child.
char ** argv = getArgs ( ss.str().c_str() );
execv ( qrsh_run_path.c_str(), argv );
perror ( "qrsh_server: execv error: " );
exit ( 1 );
}
else
{
// This is the parent.
pid_t awaited_pid;
while ( 0 == (awaited_pid = waitpid ( child_pid, & exit_code, WNOHANG)) )
{
fprintf ( stderr, "qrsh_server info: parent: waiting for child...\n" );
sleep(1);
}
if ( -1 == awaited_pid )
{
fprintf ( stderr, "qrsh_server error awaiting child!\n" );
exit ( 1 );
}
}
exit_code >>= 8;
stringstream data;
data << "wait_response "
<< exit_code;
send ( data.str() );
}
}
char const *
QrshServer::skipWord ( char const * s )
{
if(! (s && *s) )
return 0;
// skip past initial white space
while ( isspace(*s) )
{
++ s;
if(! *s)
return 0;
}
// skip past first word
while ( ! isspace(*s) )
{
++ s;
if(! *s)
return 0;
}
return s;
}
char **
QrshServer::getArgs ( char const * str )
{
char const * s = str;
char ** argv = 0;
vector<int> start_positions,
lengths;
int pos = 0;
int arg_len = 0;
int n_args = 0;
while ( 1 )
{
// advance over whitespace.
while ( isspace ( *s ) )
{
++ s; ++ pos;
if(! *s)
{
goto done;
}
}
++ n_args;
start_positions.push_back ( pos );
arg_len = 0;
// advance over non-whitespace.
while ( ! isspace ( *s ) )
{
++ s; ++ pos; ++ arg_len;
if(! *s)
{
lengths.push_back ( arg_len );
arg_len = 0;
goto done;
}
}
lengths.push_back ( arg_len );
arg_len = 0;
}
done:
if ( arg_len > 0 )
lengths.push_back ( arg_len );
// Alloc the array.
argv = (char **) malloc ( sizeof(char *) * ( n_args + 1 ) );
argv[n_args] = 0; // mull-term the array.
for ( int i = 0; i < n_args; ++ i )
{
argv[i] = ( char *) malloc ( lengths[i] + 1 );
strncpy ( argv[i],
str + start_positions[i],
lengths[i]
);
argv[i][lengths[i]] = 0;
}
return argv;
}
void
QrshServer::runCommand ( Message const & message )
{
char const * s = message.getData().c_str();
/*
* Skip the first word, which is this server's name.
*/
while ( isspace(*s) ) // go to start of first word.
++ s;
while ( ! isspace(*s) ) // go to end of first word.
++ s;
while ( isspace(*s) ) // go to start of second word.
++ s;
char command_name[1000];
sscanf ( s, "%s", command_name );
if ( ! strcmp ( "get", command_name ) )
{
get ( message );
}
else
if ( ! strcmp ( "exited", command_name ) )
{
exited ( message );
}
else
if ( ! strcmp ( "exec_wait", command_name ) )
{
wait ( message );
}
else
if ( ! strcmp ( "exec", command_name ) )
{
execute ( message );
}
else
if ( ! strcmp ( "start", command_name ) )
{
start ( );
}
else
if ( ! strcmp ( "alias", command_name ) )
{
addAlias ( message );
}
else
if ( ! strcmp ( "sayName", command_name ) )
{
sayName ( );
}
else
{
/*
* If the command is not any of the "special" commands
* above, then it's a "normal" command.
* That means we run it with a child process instance of
* qrsh_run, which will save all its data in the qrsh dir.
*/
stringstream ss;
ss << qrsh_run_path
<< ' '
<< data_dir.str()
<< ' '
<< s;
if ( ! fork() )
{
char ** argv = getArgs ( ss.str().c_str() );
execv ( qrsh_run_path.c_str(), argv );
perror ( "qrsh_server: execv error: " );
}
}
}
void
QrshServer::received ( Message & message )
{
if ( myMessage ( message ) )
runCommand ( message );
}
}} // namespace qpid::tests
using namespace qpid::tests;
/*
* fixme mick Mon Aug 3 10:29:26 EDT 2009
* argv[1] server name
* argv[2] qrsh exe path
* argv[3] host
* argv[4] port
*/
int
main ( int /*argc*/, char** argv )
{
const char* host = argv[3];
int port = atoi(argv[4]);
Connection connection;
Message msg;
srand ( getpid() );
try
{
connection.open ( host, port );
Session session = connection.newSession();
// Declare queues.
string myQueue = session.getId().getName();
session.queueDeclare ( arg::queue=myQueue,
arg::exclusive=true,
arg::autoDelete=true);
session.exchangeBind ( arg::exchange="amq.fanout",
arg::queue=myQueue,
arg::bindingKey="my-key");
// Create a server and subscribe it to my queue.
SubscriptionManager subscriptions ( session );
QrshServer server ( subscriptions,
argv[1], // server name
argv[2], // qrsh exe path
host,
port
);
subscriptions.subscribe ( server, myQueue );
// Receive messages until the subscription is cancelled
// by QrshServer::received()
subscriptions.run();
connection.close();
}
catch(const exception& error)
{
cout << error.what() << endl;
return 1;
}
return 0;
}