| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #include <string.h> |
| #include <stdio.h> |
| |
| #include <thrift/c_glib/thrift.h> |
| #include <thrift/c_glib/processor/thrift_processor.h> |
| #include <thrift/c_glib/processor/thrift_multiplexed_processor.h> |
| #include <thrift/c_glib/protocol/thrift_multiplexed_protocol.h> |
| #include <thrift/c_glib/protocol/thrift_stored_message_protocol.h> |
| #include <thrift/c_glib/thrift_application_exception.h> |
| |
| G_DEFINE_TYPE(ThriftMultiplexedProcessor, thrift_multiplexed_processor, THRIFT_TYPE_PROCESSOR) |
| |
| |
| enum |
| { |
| PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME = 1, |
| PROP_THRIFT_MULTIPLEXED_PROCESSOR_END |
| }; |
| |
| static GParamSpec *thrift_multiplexed_processor_obj_properties[PROP_THRIFT_MULTIPLEXED_PROCESSOR_END] = { NULL, }; |
| |
| |
| static gboolean |
| thrift_multiplexed_processor_register_processor_impl(ThriftProcessor *processor, const gchar * multiplexed_processor_name, ThriftProcessor * multiplexed_processor , GError **error) |
| { |
| ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR(processor); |
| g_hash_table_replace(self->multiplexed_services, |
| g_strdup(multiplexed_processor_name), |
| g_object_ref (multiplexed_processor)); |
| |
| /* Make first registered become default */ |
| if(!self->default_processor_name){ |
| self->default_processor_name = g_strdup(multiplexed_processor_name); |
| } |
| return TRUE; |
| } |
| |
| |
| static gboolean |
| thrift_multiplexed_processor_process_impl (ThriftProcessor *processor, ThriftProtocol *in, |
| ThriftProtocol *out, GError **error) |
| { |
| gboolean retval = FALSE; |
| gboolean token_error = FALSE; |
| ThriftApplicationException *xception; |
| ThriftStoredMessageProtocol *stored_message_protocol = NULL; |
| ThriftMessageType message_type; |
| ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR(processor); |
| ThriftProcessor *multiplexed_processor = NULL; |
| ThriftTransport *transport; |
| char *token=NULL; |
| int token_index=0; |
| char *state=NULL; |
| gchar *fname=NULL; |
| gint32 seqid, result; |
| |
| /* FIXME It seems that previous processor is not managing error correctly */ |
| if(*error!=NULL) { |
| g_debug ("thrift_multiplexed_processor: last error not removed: %s", |
| *error != NULL ? (*error)->message : "(null)"); |
| g_clear_error (error); |
| } |
| |
| |
| THRIFT_PROTOCOL_GET_CLASS(in)->read_message_begin(in, &fname, &message_type, &seqid, error); |
| |
| if(!(message_type == T_CALL || message_type == T_ONEWAY)) { |
| g_set_error (error, |
| THRIFT_MULTIPLEXED_PROCESSOR_ERROR, |
| THRIFT_MULTIPLEXED_PROCESSOR_ERROR_MESSAGE_TYPE, |
| "message type invalid for this processor"); |
| }else{ |
| /* Split by the token */ |
| for (token = strtok_r(fname, THRIFT_MULTIPLEXED_PROTOCOL_DEFAULT_SEPARATOR, &state), |
| token_index=0; |
| token != NULL && !token_error; |
| token = strtok_r(NULL, THRIFT_MULTIPLEXED_PROTOCOL_DEFAULT_SEPARATOR, &state), |
| token_index++) |
| { |
| switch(token_index){ |
| case 0: |
| /* It should be the service name */ |
| multiplexed_processor = g_hash_table_lookup(self->multiplexed_services, token); |
| if(multiplexed_processor==NULL){ |
| token_error=TRUE; |
| } |
| break; |
| case 1: |
| /* It should be the function name */ |
| stored_message_protocol = g_object_new (THRIFT_TYPE_STORED_MESSAGE_PROTOCOL, |
| "protocol", in, |
| "name", token, |
| "type", message_type, |
| "seqid", seqid, |
| NULL); |
| break; |
| default: |
| g_set_error (error, |
| THRIFT_MULTIPLEXED_PROCESSOR_ERROR, |
| THRIFT_MULTIPLEXED_PROCESSOR_ERROR_MESSAGE_WRONGLY_MULTIPLEXED, |
| "the message has more tokens than expected!"); |
| token_error=TRUE; |
| break; |
| } |
| } |
| /* Set default */ |
| if(!stored_message_protocol && |
| !multiplexed_processor && |
| token_index==1 && self->default_processor_name){ |
| /* It should be the service name */ |
| multiplexed_processor = g_hash_table_lookup(self->multiplexed_services, self->default_processor_name); |
| if(multiplexed_processor==NULL){ |
| g_set_error (error, |
| THRIFT_MULTIPLEXED_PROCESSOR_ERROR, |
| THRIFT_MULTIPLEXED_PROCESSOR_ERROR_SERVICE_UNAVAILABLE, |
| "service %s not available on this processor", |
| self->default_processor_name); |
| }else{ |
| /* Set the message name to the original name */ |
| stored_message_protocol = g_object_new (THRIFT_TYPE_STORED_MESSAGE_PROTOCOL, |
| "protocol", in, |
| "name", fname, |
| "type", message_type, |
| "seqid", seqid, |
| NULL); |
| } |
| |
| } |
| |
| if(stored_message_protocol!=NULL && multiplexed_processor!=NULL){ |
| retval = THRIFT_PROCESSOR_GET_CLASS (multiplexed_processor)->process (multiplexed_processor, (ThriftProtocol *) stored_message_protocol, out, error) ; |
| }else{ |
| if(!error) |
| g_set_error (error, |
| THRIFT_MULTIPLEXED_PROCESSOR_ERROR, |
| THRIFT_MULTIPLEXED_PROCESSOR_ERROR_SERVICE_UNAVAILABLE, |
| "service %s is not multiplexed in this processor", |
| fname); |
| } |
| |
| |
| } |
| |
| if(!retval){ |
| /* By default, return an application exception to the client indicating the |
| method name is not recognized. */ |
| /* Copied from dispach processor */ |
| |
| if ((thrift_protocol_skip (in, T_STRUCT, error) < 0) || |
| (thrift_protocol_read_message_end (in, error) < 0)) |
| return retval; |
| |
| g_object_get (in, "transport", &transport, NULL); |
| result = thrift_transport_read_end (transport, error); |
| g_object_unref (transport); |
| if (result < 0) { |
| /* We must free fname */ |
| g_free(fname); |
| return retval; |
| } |
| |
| if (thrift_protocol_write_message_begin (out, |
| fname, |
| T_EXCEPTION, |
| seqid, |
| error) < 0){ |
| /* We must free fname */ |
| g_free(fname); |
| |
| return retval; |
| } |
| |
| |
| xception = |
| g_object_new (THRIFT_TYPE_APPLICATION_EXCEPTION, |
| "type", THRIFT_APPLICATION_EXCEPTION_ERROR_UNKNOWN_METHOD, |
| "message", (*error)->message, |
| NULL); |
| result = thrift_struct_write (THRIFT_STRUCT (xception), |
| out, |
| error); |
| g_object_unref (xception); |
| if ((result < 0) || |
| (thrift_protocol_write_message_end (out, error) < 0)) |
| return retval; |
| |
| g_object_get (out, "transport", &transport, NULL); |
| retval = |
| ((thrift_transport_write_end (transport, error) >= 0) && |
| (thrift_transport_flush (transport, error) >= 0)); |
| g_object_unref (transport); |
| }else{ |
| /* The protocol now has a copy we can free it */ |
| g_free(fname); |
| |
| } |
| |
| /* |
| FIXME This makes everything fail, I don't know why. |
| if(stored_message_protocol!=NULL){ |
| // After its use we must free it |
| g_object_unref(stored_message_protocol); |
| } |
| */ |
| return retval; |
| } |
| |
| /* define the GError domain for Thrift transports */ |
| GQuark |
| thrift_multiplexed_processor_error_quark (void) |
| { |
| return g_quark_from_static_string (THRIFT_MULTIPLEXED_PROCESSOR_ERROR_DOMAIN); |
| } |
| |
| |
| static void |
| thrift_multiplexed_processor_set_property (GObject *object, |
| guint property_id, |
| const GValue *value, |
| GParamSpec *pspec) |
| { |
| ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR (object); |
| |
| switch (property_id) |
| { |
| case PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME: |
| self->default_processor_name = g_value_dup_string (value); |
| break; |
| |
| default: |
| /* We don't have any other property... */ |
| G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); |
| break; |
| } |
| } |
| |
| static void |
| thrift_multiplexed_processor_get_property (GObject *object, |
| guint property_id, |
| GValue *value, |
| GParamSpec *pspec) |
| { |
| ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR (object); |
| |
| switch (property_id) |
| { |
| case PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME: |
| g_value_set_string (value, self->default_processor_name); |
| break; |
| |
| default: |
| /* We don't have any other property... */ |
| G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); |
| break; |
| } |
| } |
| |
| /* destructor */ |
| static void |
| thrift_multiplexed_processor_finalize (GObject *object) |
| { |
| ThriftMultiplexedProcessor *self = THRIFT_MULTIPLEXED_PROCESSOR(object); |
| |
| /* Free our multiplexed hash table */ |
| g_hash_table_unref (self->multiplexed_services); |
| self->multiplexed_services = NULL; |
| |
| if(self->default_processor_name){ |
| g_free(self->default_processor_name); |
| self->default_processor_name=NULL; |
| } |
| |
| /* Chain up to parent */ |
| if (G_OBJECT_CLASS (thrift_multiplexed_processor_parent_class)->finalize) |
| (*G_OBJECT_CLASS (thrift_multiplexed_processor_parent_class)->finalize) (object); |
| } |
| |
| /* class initializer for ThriftMultiplexedProcessor */ |
| static void |
| thrift_multiplexed_processor_class_init (ThriftMultiplexedProcessorClass *cls) |
| { |
| /* Override */ |
| THRIFT_PROCESSOR_CLASS(cls)->process = thrift_multiplexed_processor_process_impl; |
| GObjectClass *gobject_class = G_OBJECT_CLASS (cls); |
| |
| /* Object methods */ |
| gobject_class->set_property = thrift_multiplexed_processor_set_property; |
| gobject_class->get_property = thrift_multiplexed_processor_get_property; |
| gobject_class->finalize = thrift_multiplexed_processor_finalize; |
| |
| /* Class methods */ |
| cls->register_processor = thrift_multiplexed_processor_register_processor_impl; |
| |
| |
| thrift_multiplexed_processor_obj_properties[PROP_THRIFT_MULTIPLEXED_PROCESSOR_DEFAULT_SERVICE_NAME] = |
| g_param_spec_string ("default", |
| "Default service name the protocol points to where no multiplexed client used", |
| "Set the default service name", |
| NULL, |
| (G_PARAM_READWRITE)); |
| |
| g_object_class_install_properties (gobject_class, |
| PROP_THRIFT_MULTIPLEXED_PROCESSOR_END, |
| thrift_multiplexed_processor_obj_properties); |
| |
| } |
| |
| static void |
| thrift_multiplexed_processor_init (ThriftMultiplexedProcessor *self) |
| { |
| |
| /* Create our multiplexed services hash table */ |
| self->multiplexed_services = g_hash_table_new_full ( |
| g_str_hash, |
| g_str_equal, |
| g_free, |
| g_object_unref); |
| self->default_processor_name = NULL; |
| } |
| |
| |
| gboolean |
| thrift_multiplexed_processor_register_processor(ThriftProcessor *processor, const gchar * multiplexed_processor_name, ThriftProcessor * multiplexed_processor , GError **error) |
| { |
| return THRIFT_MULTIPLEXED_PROCESSOR_GET_CLASS(processor)->register_processor(processor, multiplexed_processor_name, multiplexed_processor, error); |
| } |
| |
| |