blob: 75caa49095b6f385c48c466de1a393923d8d09ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* tcp://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/SubscriptionManager.h>
#include <qpid/sys/Time.h>
#include <axis2_amqp_defines.h>
#include <axiom_mime_part.h>
#include <axis2_qpid_sender.h>
#include <fstream>
using namespace std;
using namespace qpid::client;
using namespace qpid::framing;
Axis2QpidSender::Axis2QpidSender(string qpidBrokerIP, int qpidBrokerPort, const axutil_env_t* env)
{
this->qpidBrokerIP = qpidBrokerIP;
this->qpidBrokerPort = qpidBrokerPort;
this->env = env;
this->responseContent = "";
this->responseContentType = "";
}
Axis2QpidSender::~Axis2QpidSender(void)
{}
bool Axis2QpidSender::SendReceive(string messageContent, string toQueueName, bool isSOAP11,
string contentType, string soapAction, axutil_array_list_t* mime_parts, int timeout)
{
bool status = false;
this->responseContent = "";
this->responseContentType = "";
try
{
Connection connection;
connection.open(qpidBrokerIP, qpidBrokerPort);
Session session = connection.newSession();
/* Declare Private Queue */
string replyToQueueName = AXIS2_AMQP_TEMP_QUEUE_NAME_PREFIX;
replyToQueueName.append(axutil_uuid_gen(env));
session.queueDeclare(arg::queue = replyToQueueName, arg::autoDelete = true);
session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT,
arg::queue = replyToQueueName,
arg::bindingKey = replyToQueueName);
/* Create Message */
Message reqMessage;
reqMessage.getDeliveryProperties().setRoutingKey(toQueueName);
reqMessage.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
reqMessage.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
reqMessage.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
if (mime_parts)
{
string mimeBody;
GetMimeBody(mime_parts, mimeBody);
messageContent.clear();/* MIME parts include SOAP envelop */
messageContent.append(mimeBody);
}
reqMessage.setData(messageContent);
async(session).messageTransfer(arg::content = reqMessage, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
/* Create subscription manager */
SubscriptionManager subscriptionManager(session);
Message resMessage;
qpid::sys::Duration reqTimeout(timeout * AXIS2_AMQP_NANOSEC_PER_MILLISEC);
if (subscriptionManager.get(resMessage, replyToQueueName, reqTimeout))
{
responseContent = resMessage.getData();
responseContentType = resMessage.getHeaders().getString(AXIS2_AMQP_HEADER_CONTENT_TYPE);
status = true;
}
connection.close();
}
catch (const std::exception& e)
{
}
return status;
}
bool Axis2QpidSender::Send(string messageContent, string toQueueName, string replyToQueueName,
bool isSOAP11, string contentType, string soapAction, axutil_array_list_t* mime_parts)
{
bool status = false;
try
{
Connection connection;
connection.open(qpidBrokerIP, qpidBrokerPort);
Session session = connection.newSession();
/* Create Message */
Message message;
message.getDeliveryProperties().setRoutingKey(toQueueName);
if (!replyToQueueName.empty()) /* Client dual-channel */
{
message.getMessageProperties().setReplyTo(ReplyTo(AXIS2_AMQP_EXCHANGE_DIRECT, replyToQueueName));
session.queueDeclare(arg::queue = replyToQueueName);
session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT,
arg::queue = replyToQueueName,
arg::bindingKey = replyToQueueName);
}
message.getHeaders().setString(AXIS2_AMQP_HEADER_CONTENT_TYPE, contentType);
message.getHeaders().setString(AXIS2_AMQP_HEADER_SOAP_ACTION, soapAction);
if (mime_parts)
{
string mimeBody;
GetMimeBody(mime_parts, mimeBody);
messageContent.clear();/* MIME parts include SOAP envelop */
messageContent.append(mimeBody);
}
message.setData(messageContent);
async(session).messageTransfer(arg::content = message, arg::destination = AXIS2_AMQP_EXCHANGE_DIRECT);
connection.close();
status = true;
}
catch (const std::exception& e)
{
}
return status;
}
void Axis2QpidSender::GetMimeBody(axutil_array_list_t* mime_parts, string& mimeBody)
{
int i = 0;
axiom_mime_part_t *mime_part = NULL;
axis2_status_t status = AXIS2_SUCCESS;
if (!mime_parts)
return;
for (i = 0; i < axutil_array_list_size(mime_parts, env); i++)
{
mime_part = (axiom_mime_part_t *)axutil_array_list_get(mime_parts, env, i);
if (mime_part->type == AXIOM_MIME_PART_BUFFER)
{
mimeBody.append(mime_part->part, mime_part->part_size);
}
else if (mime_part->type == AXIOM_MIME_PART_FILE)
{
int length;
char* buffer;
ifstream file;
file.open(mime_part->file_name, ios::binary);
file.seekg(0, ios::end);
length = file.tellg();
file.seekg(0, ios::beg);
buffer = new char[length];
file.read(buffer, length);
file.close();
mimeBody.append(buffer, length);
delete [] buffer;
}
else
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Unknown mime type");
return;
}
if (status == AXIS2_FAILURE)
{
break;
}
}
}