blob: 85c0affc265985f37bcd1be3b86adc500cbc9091 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef THRIFT_TMULTIPLEXEDPROCESSOR_H_
#define THRIFT_TMULTIPLEXEDPROCESSOR_H_ 1
#include <thrift/protocol/TProtocolDecorator.h>
#include <thrift/TApplicationException.h>
#include <thrift/TProcessor.h>
#include <boost/tokenizer.hpp>
namespace apache {
namespace thrift {
namespace protocol {
/**
* To be able to work with any protocol, we needed
* to allow them to call readMessageBegin() and get a TMessage in exactly
* the standard format, without the service name prepended to TMessage.name.
*/
class StoredMessageProtocol : public TProtocolDecorator {
public:
StoredMessageProtocol(std::shared_ptr<protocol::TProtocol> _protocol,
const std::string& _name,
const TMessageType _type,
const int32_t _seqid)
: TProtocolDecorator(_protocol), name(_name), type(_type), seqid(_seqid) {}
uint32_t readMessageBegin_virt(std::string& _name, TMessageType& _type, int32_t& _seqid) override {
_name = name;
_type = type;
_seqid = seqid;
return 0; // (Normal TProtocol read functions return number of bytes read)
}
std::string name;
TMessageType type;
int32_t seqid;
};
} // namespace protocol
/**
* <code>TMultiplexedProcessor</code> is a <code>TProcessor</code> allowing
* a single <code>TServer</code> to provide multiple services.
*
* <p>To do so, you instantiate the processor and then register additional
* processors with it, as shown in the following example:</p>
*
* <blockquote><code>
* std::shared_ptr<TMultiplexedProcessor> processor(new TMultiplexedProcessor());
*
* processor->registerProcessor(
* "Calculator",
* std::shared_ptr<TProcessor>( new CalculatorProcessor(
* std::shared_ptr<CalculatorHandler>( new CalculatorHandler()))));
*
* processor->registerProcessor(
* "WeatherReport",
* std::shared_ptr<TProcessor>( new WeatherReportProcessor(
* std::shared_ptr<WeatherReportHandler>( new WeatherReportHandler()))));
*
* std::shared_ptr<TServerTransport> transport(new TServerSocket(9090));
* TSimpleServer server(processor, transport);
*
* server.serve();
* </code></blockquote>
*/
class TMultiplexedProcessor : public TProcessor {
public:
typedef std::map<std::string, std::shared_ptr<TProcessor> > services_t;
/**
* 'Register' a service with this <code>TMultiplexedProcessor</code>. This
* allows us to broker requests to individual services by using the service
* name to select them at request time.
*
* \param [in] serviceName Name of a service, has to be identical to the name
* declared in the Thrift IDL, e.g. "WeatherReport".
* \param [in] processor Implementation of a service, usually referred to
* as "handlers", e.g. WeatherReportHandler,
* implementing WeatherReportIf interface.
*/
void registerProcessor(const std::string& serviceName, std::shared_ptr<TProcessor> processor) {
services[serviceName] = processor;
}
/**
* Register a service to be called to process queries without service name
* \param [in] processor Implementation of a service.
*/
void registerDefault(const std::shared_ptr<TProcessor>& processor) {
defaultProcessor = processor;
}
/**
* Chew up invalid input and return an exception to throw.
*/
TException protocol_error(std::shared_ptr<protocol::TProtocol> in,
std::shared_ptr<protocol::TProtocol> out,
const std::string& name,
int32_t seqid,
const std::string& msg) const {
in->skip(::apache::thrift::protocol::T_STRUCT);
in->readMessageEnd();
in->getTransport()->readEnd();
::apache::thrift::TApplicationException
x(::apache::thrift::TApplicationException::PROTOCOL_ERROR,
"TMultiplexedProcessor: " + msg);
out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid);
x.write(out.get());
out->writeMessageEnd();
out->getTransport()->writeEnd();
out->getTransport()->flush();
return TException(msg);
}
/**
* This implementation of <code>process</code> performs the following steps:
*
* <ol>
* <li>Read the beginning of the message.</li>
* <li>Extract the service name from the message.</li>
* <li>Using the service name to locate the appropriate processor.</li>
* <li>Dispatch to the processor, with a decorated instance of TProtocol
* that allows readMessageBegin() to return the original TMessage.</li>
* </ol>
*
* \throws TException If the message type is not T_CALL or T_ONEWAY, if
* the service name was not found in the message, or if the service
* name was not found in the service map.
*/
bool process(std::shared_ptr<protocol::TProtocol> in,
std::shared_ptr<protocol::TProtocol> out,
void* connectionContext) override {
std::string name;
protocol::TMessageType type;
int32_t seqid;
// Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
// message header. This pulls the message "off the wire", which we'll
// deal with at the end of this method.
in->readMessageBegin(name, type, seqid);
if (type != protocol::T_CALL && type != protocol::T_ONEWAY) {
// Unexpected message type.
throw protocol_error(in, out, name, seqid, "Unexpected message type");
}
// Extract the service name
boost::tokenizer<boost::char_separator<char> > tok(name, boost::char_separator<char>(":"));
std::vector<std::string> tokens;
std::copy(tok.begin(), tok.end(), std::back_inserter(tokens));
// A valid message should consist of two tokens: the service
// name and the name of the method to call.
if (tokens.size() == 2) {
// Search for a processor associated with this service name.
auto it = services.find(tokens[0]);
if (it != services.end()) {
std::shared_ptr<TProcessor> processor = it->second;
// Let the processor registered for this service name
// process the message.
return processor
->process(std::shared_ptr<protocol::TProtocol>(
new protocol::StoredMessageProtocol(in, tokens[1], type, seqid)),
out,
connectionContext);
} else {
// Unknown service.
throw protocol_error(in, out, name, seqid,
"Unknown service: " + tokens[0] +
". Did you forget to call registerProcessor()?");
}
} else if (tokens.size() == 1) {
if (defaultProcessor) {
// non-multiplexed client forwards to default processor
return defaultProcessor
->process(std::shared_ptr<protocol::TProtocol>(
new protocol::StoredMessageProtocol(in, tokens[0], type, seqid)),
out,
connectionContext);
} else {
throw protocol_error(in, out, name, seqid,
"Non-multiplexed client request dropped. "
"Did you forget to call defaultProcessor()?");
}
} else {
throw protocol_error(in, out, name, seqid,
"Wrong number of tokens.");
}
}
private:
/** Map of service processor objects, indexed by service names. */
services_t services;
//! If a non-multi client requests something, it goes to the
//! default processor (if one is defined) for backwards compatibility.
std::shared_ptr<TProcessor> defaultProcessor;
};
}
}
#endif // THRIFT_TMULTIPLEXEDPROCESSOR_H_