| #include <tr1/functional> |
| #include <thrift/protocol/TBinaryProtocol.h> |
| #include <thrift/async/TAsyncProtocolProcessor.h> |
| #include <thrift/async/TEvhttpServer.h> |
| #include <thrift/async/TEvhttpClientChannel.h> |
| #include "Aggr.h" |
| |
| using std::tr1::bind; |
| using std::tr1::placeholders::_1; |
| |
| using apache::thrift::TException; |
| using apache::thrift::protocol::TBinaryProtocolFactory; |
| using apache::thrift::protocol::TProtocolFactory; |
| using apache::thrift::async::TEvhttpServer; |
| using apache::thrift::async::TAsyncProcessor; |
| using apache::thrift::async::TAsyncBufferProcessor; |
| using apache::thrift::async::TAsyncProtocolProcessor; |
| using apache::thrift::async::TAsyncChannel; |
| using apache::thrift::async::TEvhttpClientChannel; |
| |
| class AggrAsyncHandler : public AggrCobSvIf { |
| protected: |
| struct RequestContext { |
| std::tr1::function<void(std::vector<int32_t> const& _return)> cob; |
| std::vector<int32_t> ret; |
| int pending_calls; |
| }; |
| |
| public: |
| AggrAsyncHandler() |
| : eb_(NULL) |
| , pfact_(new TBinaryProtocolFactory()) |
| { |
| leaf_ports_.push_back(8081); |
| leaf_ports_.push_back(8082); |
| } |
| |
| void addValue(std::tr1::function<void()> cob, const int32_t value) { |
| // Silently drop writes to the aggrgator. |
| return cob(); |
| } |
| |
| void getValues(std::tr1::function<void( |
| std::vector<int32_t> const& _return)> cob, |
| std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) { |
| RequestContext* ctx = new RequestContext(); |
| ctx->cob = cob; |
| ctx->pending_calls = leaf_ports_.size(); |
| for (std::vector<int>::iterator it = leaf_ports_.begin(); |
| it != leaf_ports_.end(); ++it) { |
| boost::shared_ptr<TAsyncChannel> channel( |
| new TEvhttpClientChannel( |
| "localhost", "/", "127.0.0.1", *it, eb_)); |
| AggrCobClient* client = new AggrCobClient(channel, pfact_.get()); |
| client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1)); |
| } |
| } |
| |
| void setEventBase(struct event_base* eb) { |
| eb_ = eb; |
| } |
| |
| void clientReturn(RequestContext* ctx, AggrCobClient* client) { |
| ctx->pending_calls -= 1; |
| |
| try { |
| std::vector<int32_t> subret; |
| client->recv_getValues(subret); |
| ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end()); |
| } catch (TException& exn) { |
| // TODO: Log error |
| } |
| |
| delete client; |
| |
| if (ctx->pending_calls == 0) { |
| ctx->cob(ctx->ret); |
| delete ctx; |
| } |
| } |
| |
| protected: |
| struct event_base* eb_; |
| std::vector<int> leaf_ports_; |
| boost::shared_ptr<TProtocolFactory> pfact_; |
| }; |
| |
| |
| int main() { |
| boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler()); |
| boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler)); |
| boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory()); |
| boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact)); |
| boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080)); |
| handler->setEventBase(server->getEventBase()); |
| server->serve(); |
| } |