blob: 518929c426e91ceb51eacd412c84dafe3d9741c6 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "../options.hpp"
#include "../broker.hpp"
#include <iostream>
// FIXME aconway 2016-01-23: port broker to windows, need io::poller.
#ifdef WIN32
#include "proton/acceptor.hpp"
#include "proton/container.hpp"
#include "proton/value.hpp"
class broker {
public:
broker(const proton::url& url) : handler_(url, queues_) {}
proton::handler& handler() { return handler_; }
private:
class my_handler : public broker_handler {
public:
my_handler(const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {}
void on_start(proton::event &e) {
e.container().listen(url_);
std::cout << "broker listening on " << url_ << std::endl;
}
private:
const proton::url& url_;
};
private:
queues queues_;
my_handler handler_;
};
int main(int argc, char **argv) {
// Command line options
proton::url url("0.0.0.0");
options opts(argc, argv);
opts.add_value(url, 'a', "address", "listen on URL", "URL");
try {
opts.parse();
broker b(url);
proton::container(b.handler()).run();
return 0;
} catch (const bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}
#else // WIN32
#include <proton/io.hpp>
#include <sys/select.h>
#include <set>
template <class T> T check(T result, const std::string& msg="io_error: ") {
if (result < 0)
throw proton::io_error(msg + proton::io::error_str());
return result;
}
void fd_set_if(bool on, int fd, fd_set *fds);
class broker {
typedef std::set<proton::io::socket_engine*> engines;
queues queues_;
broker_handler handler_;
proton::connection_engine::container container_;
engines engines_;
fd_set reading_, writing_;
public:
broker() : handler_(queues_) {
FD_ZERO(&reading_);
FD_ZERO(&writing_);
}
~broker() {
for (engines::iterator i = engines_.begin(); i != engines_.end(); ++i)
delete *i;
}
void run(const proton::url& url) {
proton::io::listener listener(url.host(), url.port());
std::cout << "listening on " << url << " fd=" << listener.socket() << std::endl;
FD_SET(listener.socket(), &reading_);
while(true) {
fd_set readable_set = reading_;
fd_set writable_set = writing_;
check(select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select");
if (FD_ISSET(listener.socket(), &readable_set)) {
std::string client_host, client_port;
int fd = listener.accept(client_host, client_port);
std::cout << "accepted " << client_host << ":" << client_port
<< " fd=" << fd << std::endl;
engines_.insert(new proton::io::socket_engine(fd, handler_, container_.make_options()));
FD_SET(fd, &reading_);
FD_SET(fd, &writing_);
}
for (engines::iterator i = engines_.begin(); i != engines_.end(); ) {
engines::iterator j = i++; // Save iterator in case we need to erase it.
proton::io::socket_engine *eng = *j;
int flags = 0;
if (FD_ISSET(eng->socket(), &readable_set))
flags |= proton::io::socket_engine::READ;
if (FD_ISSET(eng->socket(), &writable_set))
flags |= proton::io::socket_engine::WRITE;
if (flags) eng->process_nothrow(flags);
// Set reading/writing bits for next time around
fd_set_if(eng->can_read(), eng->socket(), &reading_);
fd_set_if(eng->can_write(), eng->socket(), &writing_);
if (eng->closed()) {
std::cout << "closed fd=" << eng->socket() << " "
<< eng->error_str() << std::endl;
engines_.erase(j);
delete eng;
}
}
}
}
};
void fd_set_if(bool on, int fd, fd_set *fds) {
if (on)
FD_SET(fd, fds);
else
FD_CLR(fd, fds);
}
int main(int argc, char **argv) {
// Command line options
std::string address("0.0.0.0");
options opts(argc, argv);
opts.add_value(address, 'a', "address", "listen on URL", "URL");
try {
opts.parse();
broker().run(address);
return 0;
} catch (const bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}
#endif