| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <qpid/client/Connection.h> |
| #include <qpid/client/Session.h> |
| #include <qpid/client/Message.h> |
| #include <qpid/client/SubscriptionManager.h> |
| #include <qpid/sys/Time.h> |
| #include <axis2_amqp_defines.h> |
| #include <axiom_mime_part.h> |
| #include <axis2_qpid_sender.h> |
| #include <fstream> |
| |
| using namespace std; |
| using namespace qpid::client; |
| using namespace qpid::framing; |
| |
| Axis2QpidSender::Axis2QpidSender( |
| string qpidBrokerIP, |
| int qpidBrokerPort, |
| const axutil_env_t* env) |
| { |
| this->qpidBrokerIP = qpidBrokerIP; |
| this->qpidBrokerPort = qpidBrokerPort; |
| this->env = env; |
| this->responseContent = ""; |
| this->responseContentType = ""; |
| } |
| |
| Axis2QpidSender::~Axis2QpidSender( |
| void) |
| { |
| } |
| |
| bool |
| Axis2QpidSender::SendReceive( |
| string messageContent, |
| string toQueueName, |
| bool isSOAP11, |
| string contentType, |
| string soapAction, |
| axutil_array_list_t* mime_parts, |
| int timeout) |
| { |
| bool status = false; |
| this->responseContent = ""; |
| this->responseContentType = ""; |
| |
| try |
| { |
| Connection connection; |
| connection.open(qpidBrokerIP, qpidBrokerPort); |
| |
| Session session = connection.newSession(); |
| |
| /* Declare Private Queue */ |
| string replyToQueueName = AXIS2_AMQP_TEMP_QUEUE_NAME_PREFIX; |
| replyToQueueName.append(axutil_uuid_gen(env)); |
| |
| session.queueDeclare(arg::queue = replyToQueueName, arg::autoDelete = true); |
| session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT, arg::queue |
| = replyToQueueName, arg::bindingKey = replyToQueueName); |
| |
| /* Create Message */ |
| Message reqMessage; |
| |
| reqMessage.getDeliveryProperties().setRoutingKey(toQueueName); |
| reqMessage.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, |
| replyToQueueName)); |
| |
| reqMessage.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType); |
| reqMessage.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction); |
| |
| if(mime_parts) |
| { |
| string mimeBody; |
| GetMimeBody(mime_parts, mimeBody); |
| |
| messageContent.clear();/* MIME parts include SOAP envelop */ |
| |
| messageContent.append(mimeBody); |
| } |
| |
| reqMessage.setData(messageContent); |
| |
| session.messageTransfer(arg::content = reqMessage, arg::destination |
| = AXIS2_AMQP_EXCHANGE_DIRECT); |
| |
| /* Create subscription manager */ |
| SubscriptionManager subscriptionManager(session); |
| |
| Message resMessage; |
| qpid::sys::Duration reqTimeout(timeout * AXIS2_AMQP_NANOSEC_PER_MILLISEC); |
| |
| if(subscriptionManager.get(resMessage, replyToQueueName, reqTimeout)) |
| { |
| responseContent = resMessage.getData(); |
| responseContentType = resMessage.getHeaders().getAsString( |
| AXIS2_AMQP_HEADER_CONTENT_TYPE); |
| |
| status = true; |
| } |
| |
| connection.close(); |
| } |
| catch(const std::exception& e) |
| { |
| } |
| |
| return status; |
| } |
| |
| bool |
| Axis2QpidSender::Send( |
| string messageContent, |
| string toQueueName, |
| string replyToQueueName, |
| bool isSOAP11, |
| string contentType, |
| string soapAction, |
| axutil_array_list_t* mime_parts) |
| { |
| bool status = false; |
| |
| try |
| { |
| Connection connection; |
| connection.open(qpidBrokerIP, qpidBrokerPort); |
| |
| Session session = connection.newSession(); |
| |
| /* Create Message */ |
| Message message; |
| |
| message.getDeliveryProperties().setRoutingKey(toQueueName); |
| |
| if(!replyToQueueName.empty()) /* Client dual-channel */ |
| { |
| message.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, |
| replyToQueueName)); |
| |
| session.queueDeclare(arg::queue = replyToQueueName); |
| session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT, arg::queue |
| = replyToQueueName, arg::bindingKey = replyToQueueName); |
| } |
| |
| message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType); |
| message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction); |
| |
| if(mime_parts) |
| { |
| string mimeBody; |
| GetMimeBody(mime_parts, mimeBody); |
| |
| messageContent.clear();/* MIME parts include SOAP envelop */ |
| |
| messageContent.append(mimeBody); |
| } |
| |
| message.setData(messageContent); |
| |
| session.messageTransfer(arg::content = message, arg::destination |
| = AXIS2_AMQP_EXCHANGE_DIRECT); |
| |
| connection.close(); |
| |
| status = true; |
| } |
| catch(const std::exception& e) |
| { |
| } |
| |
| return status; |
| } |
| |
| void |
| Axis2QpidSender::GetMimeBody( |
| axutil_array_list_t* mime_parts, |
| string& mimeBody) |
| { |
| int i = 0; |
| axiom_mime_part_t *mime_part = NULL; |
| axis2_status_t status = AXIS2_SUCCESS; |
| |
| if(!mime_parts) |
| return; |
| |
| for(i = 0; i < axutil_array_list_size(mime_parts, env); i++) |
| { |
| mime_part = (axiom_mime_part_t *)axutil_array_list_get(mime_parts, env, i); |
| |
| if(mime_part->type == AXIOM_MIME_PART_BUFFER) |
| { |
| mimeBody.append(mime_part->part, mime_part->part_size); |
| } |
| else if(mime_part->type == AXIOM_MIME_PART_FILE) |
| { |
| int length; |
| char* buffer; |
| |
| ifstream file; |
| file.open(mime_part->file_name, ios::binary); |
| |
| file.seekg(0, ios::end); |
| length = file.tellg(); |
| file.seekg(0, ios::beg); |
| |
| buffer = new char[length]; |
| |
| file.read(buffer, length); |
| file.close(); |
| |
| mimeBody.append(buffer, length); |
| |
| delete[] buffer; |
| } |
| else |
| { |
| AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Unknown mime type"); |
| return; |
| } |
| |
| if(status == AXIS2_FAILURE) |
| { |
| break; |
| } |
| } |
| } |