blob: 68a0f4d46cb6713695de564b1752314faae91ecc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <string.h>
#include <stdio.h>
#include <thrift/c_glib/thrift.h>
#include <thrift/c_glib/processor/thrift_processor.h>
#include <thrift/c_glib/processor/thrift_multiplexed_processor.h>
#include <thrift/c_glib/protocol/thrift_multiplexed_protocol.h>
#include <thrift/c_glib/protocol/thrift_stored_message_protocol.h>
#include <thrift/c_glib/thrift_application_exception.h>
G_DEFINE_TYPE(ThriftMultiplexedProcessor, thrift_multiplexed_processor, THRIFT_TYPE_PROCESSOR)
enum
{
PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME = 1,
PROP_THRIFT_MULTIPLEXED_PROCESSOR_END
};
static GParamSpec *thrift_multiplexed_processor_obj_properties[PROP_THRIFT_MULTIPLEXED_PROCESSOR_END] = { NULL, };
static gboolean
thrift_multiplexed_processor_register_processor_impl(ThriftProcessor *processor, const gchar * multiplexed_processor_name, ThriftProcessor * multiplexed_processor , GError **error)
{
ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR(processor);
g_hash_table_replace(self->multiplexed_services,
g_strdup(multiplexed_processor_name),
g_object_ref (multiplexed_processor));
/* Make first registered become default */
if(!self->default_processor_name){
self->default_processor_name = g_strdup(multiplexed_processor_name);
}
return TRUE;
}
static gboolean
thrift_multiplexed_processor_process_impl (ThriftProcessor *processor, ThriftProtocol *in,
ThriftProtocol *out, GError **error)
{
gboolean retval = FALSE;
gboolean token_error = FALSE;
ThriftApplicationException *xception;
ThriftStoredMessageProtocol *stored_message_protocol = NULL;
ThriftMessageType message_type;
ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR(processor);
ThriftProcessor *multiplexed_processor = NULL;
ThriftTransport *transport;
char *token=NULL;
int token_index=0;
char *state=NULL;
gchar *fname=NULL;
gint32 seqid, result;
/* FIXME It seems that previous processor is not managing error correctly */
if(*error!=NULL) {
g_debug ("thrift_multiplexed_processor: last error not removed: %s",
*error != NULL ? (*error)->message : "(null)");
g_clear_error (error);
}
THRIFT_PROTOCOL_GET_CLASS(in)->read_message_begin(in, &fname, &message_type, &seqid, error);
if(!(message_type == T_CALL || message_type == T_ONEWAY)) {
g_set_error (error,
THRIFT_MULTIPLEXED_PROCESSOR_ERROR,
THRIFT_MULTIPLEXED_PROCESSOR_ERROR_MESSAGE_TYPE,
"message type invalid for this processor");
}else{
/* Split by the token */
for (token = strtok_r(fname, THRIFT_MULTIPLEXED_PROTOCOL_DEFAULT_SEPARATOR, &state),
token_index=0;
token != NULL && !token_error;
token = strtok_r(NULL, THRIFT_MULTIPLEXED_PROTOCOL_DEFAULT_SEPARATOR, &state),
token_index++)
{
switch(token_index){
case 0:
/* It should be the service name */
multiplexed_processor = g_hash_table_lookup(self->multiplexed_services, token);
if(multiplexed_processor==NULL){
token_error=TRUE;
}
break;
case 1:
/* It should be the function name */
stored_message_protocol = g_object_new (THRIFT_TYPE_STORED_MESSAGE_PROTOCOL,
"protocol", in,
"name", token,
"type", message_type,
"seqid", seqid,
NULL);
break;
default:
g_set_error (error,
THRIFT_MULTIPLEXED_PROCESSOR_ERROR,
THRIFT_MULTIPLEXED_PROCESSOR_ERROR_MESSAGE_WRONGLY_MULTIPLEXED,
"the message has more tokens than expected!");
token_error=TRUE;
break;
}
}
/* Set default */
if(!stored_message_protocol &&
!multiplexed_processor &&
token_index==1 && self->default_processor_name){
/* It should be the service name */
multiplexed_processor = g_hash_table_lookup(self->multiplexed_services, self->default_processor_name);
if(multiplexed_processor==NULL){
g_set_error (error,
THRIFT_MULTIPLEXED_PROCESSOR_ERROR,
THRIFT_MULTIPLEXED_PROCESSOR_ERROR_SERVICE_UNAVAILABLE,
"service %s not available on this processor",
self->default_processor_name);
}else{
/* Set the message name to the original name */
stored_message_protocol = g_object_new (THRIFT_TYPE_STORED_MESSAGE_PROTOCOL,
"protocol", in,
"name", fname,
"type", message_type,
"seqid", seqid,
NULL);
}
}
if(stored_message_protocol!=NULL && multiplexed_processor!=NULL){
retval = THRIFT_PROCESSOR_GET_CLASS (multiplexed_processor)->process (multiplexed_processor, (ThriftProtocol *) stored_message_protocol, out, error) ;
}else{
if(!error)
g_set_error (error,
THRIFT_MULTIPLEXED_PROCESSOR_ERROR,
THRIFT_MULTIPLEXED_PROCESSOR_ERROR_SERVICE_UNAVAILABLE,
"service %s is not multiplexed in this processor",
fname);
}
}
if(!retval){
/* By default, return an application exception to the client indicating the
method name is not recognized. */
/* Copied from dispach processor */
if ((thrift_protocol_skip (in, T_STRUCT, error) < 0) ||
(thrift_protocol_read_message_end (in, error) < 0))
return retval;
g_object_get (in, "transport", &transport, NULL);
result = thrift_transport_read_end (transport, error);
g_object_unref (transport);
if (result < 0) {
/* We must free fname */
g_free(fname);
return retval;
}
if (thrift_protocol_write_message_begin (out,
fname,
T_EXCEPTION,
seqid,
error) < 0){
/* We must free fname */
g_free(fname);
return retval;
}
xception =
g_object_new (THRIFT_TYPE_APPLICATION_EXCEPTION,
"type", THRIFT_APPLICATION_EXCEPTION_ERROR_UNKNOWN_METHOD,
"message", (*error)->message,
NULL);
result = thrift_struct_write (THRIFT_STRUCT (xception),
out,
error);
g_object_unref (xception);
if ((result < 0) ||
(thrift_protocol_write_message_end (out, error) < 0))
return retval;
g_object_get (out, "transport", &transport, NULL);
retval =
((thrift_transport_write_end (transport, error) >= 0) &&
(thrift_transport_flush (transport, error) >= 0));
g_object_unref (transport);
}else{
/* The protocol now has a copy we can free it */
g_free(fname);
}
/*
FIXME This makes everything fail, I don't know why.
if(stored_message_protocol!=NULL){
// After its use we must free it
g_object_unref(stored_message_protocol);
}
*/
return retval;
}
/* define the GError domain for Thrift transports */
GQuark
thrift_multiplexed_processor_error_quark (void)
{
return g_quark_from_static_string (THRIFT_MULTIPLEXED_PROCESSOR_ERROR_DOMAIN);
}
static void
thrift_multiplexed_processor_set_property (GObject *object,
guint property_id,
const GValue *value,
GParamSpec *pspec)
{
ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR (object);
switch (property_id)
{
case PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME:
self->default_processor_name = g_value_dup_string (value);
break;
default:
/* We don't have any other property... */
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
static void
thrift_multiplexed_processor_get_property (GObject *object,
guint property_id,
GValue *value,
GParamSpec *pspec)
{
ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR (object);
switch (property_id)
{
case PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME:
g_value_set_string (value, self->default_processor_name);
break;
default:
/* We don't have any other property... */
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
/* destructor */
static void
thrift_multiplexed_processor_finalize (GObject *object)
{
ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR(object);
/* Free our multiplexed hash table */
g_hash_table_unref (self->multiplexed_services);
self->multiplexed_services = NULL;
if(self->default_processor_name){
g_free(self->default_processor_name);
self->default_processor_name=NULL;
}
/* Chain up to parent */
if (G_OBJECT_CLASS (thrift_multiplexed_processor_parent_class)->finalize)
(*G_OBJECT_CLASS (thrift_multiplexed_processor_parent_class)->finalize) (object);
}
/* class initializer for ThriftMultiplexedProcessor */
static void
thrift_multiplexed_processor_class_init (ThriftMultiplexedProcessorClass *cls)
{
/* Override */
THRIFT_PROCESSOR_CLASS(cls)->process = thrift_multiplexed_processor_process_impl;
GObjectClass *gobject_class = G_OBJECT_CLASS (cls);
/* Object methods */
gobject_class->set_property = thrift_multiplexed_processor_set_property;
gobject_class->get_property = thrift_multiplexed_processor_get_property;
gobject_class->finalize = thrift_multiplexed_processor_finalize;
/* Class methods */
cls->register_processor = thrift_multiplexed_processor_register_processor_impl;
thrift_multiplexed_processor_obj_properties[PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME] =
g_param_spec_string ("default",
"Default service name the protocol points to where no multiplexed client used",
"Set the default service name",
NULL,
(G_PARAM_READWRITE));
g_object_class_install_properties (gobject_class,
PROP_THRIFT_MULTIPLEXED_PROCESSOR_END,
thrift_multiplexed_processor_obj_properties);
}
static void
thrift_multiplexed_processor_init (ThriftMultiplexedProcessor *self)
{
/* Create our multiplexed services hash table */
self->multiplexed_services = g_hash_table_new_full (
g_str_hash,
g_str_equal,
g_free,
g_object_unref);
self->default_processor_name = NULL;
}
gboolean
thrift_multiplexed_processor_register_processor(ThriftProcessor *processor, const gchar * multiplexed_processor_name, ThriftProcessor * multiplexed_processor , GError **error)
{
return THRIFT_MULTIPLEXED_PROCESSOR_GET_CLASS(processor)->register_processor(processor, multiplexed_processor_name, multiplexed_processor, error);
}