blob: 70d422cf16fdfc8e8e09666329c2997e500f53d8 [file] [log] [blame]
/*
* File: CvRabbitMq.cpp
* Author: mony
*
* Created on August 31, 2012, 12:11 PM
*/
#include "CvRabbitMq.h"
#include "CvLogger.h"
#include <amqp_framing.h>
#include <stdlib.h>
#include <memory.h>
const amqp_table_t amqp_empty_table = { 0, NULL };
const amqp_bytes_t amqp_empty_bytes = { 0, NULL };
using namespace CvShared;
CvRabbitMq::CvRabbitMq( const String& aQueueName ) :
m_name(aQueueName), m_port(0), m_amqpConn(NULL), m_channel(1)
{
}
CvRabbitMq::~CvRabbitMq()
{
Disconnect();
}
bool CvRabbitMq::Connect( const String& aHost, u_short aPort, const String& aUser, const String& aPassword )
{
if ( m_amqpConn != NULL || !m_host.empty() )
Disconnect();
m_host = aHost;
m_port = aPort;
m_user = aUser;
m_password = aPassword;
m_amqpConn = amqp_new_connection();
int sockfd = amqp_open_socket( m_host.c_str(), m_port );
if ( CheckForError( sockfd, "Opening socket" ) )
return false;
amqp_set_sockfd( m_amqpConn, sockfd );
amqp_rpc_reply_t reply;
reply = amqp_login( m_amqpConn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, m_user.c_str(), m_password.c_str() );
if ( CheckForError( reply, "Logging in" ) )
return false;
amqp_channel_open( m_amqpConn, m_channel );
if ( CheckForError( m_amqpConn, "Opening channel" ) )
return false;
return true;
}
bool CvRabbitMq::Disconnect()
{
if ( m_amqpConn == NULL )
return true;
amqp_rpc_reply_t reply;
reply = amqp_channel_close( m_amqpConn, m_channel, AMQP_REPLY_SUCCESS );
if ( CheckForError( reply, "Closing channel" ) )
return false;
reply = amqp_connection_close( m_amqpConn, AMQP_REPLY_SUCCESS );
if ( CheckForError( reply, "Closing connection" ) )
return false;
int result = amqp_destroy_connection( m_amqpConn );
if ( CheckForError( result, "Ending connection" ) )
return false;
m_amqpConn = NULL;
m_host.clear();
m_port = 0;
m_user.clear();
m_password.clear();
return true;
}
bool CvRabbitMq::Read( OUT uint8_t* apBuf, size_t aMaxLen, OUT size_t& aReadLen, const Millisecs& aTimeout )
{
aReadLen = 0;
if ( m_amqpConn == NULL )
{
LogMessage( enLogLevel_Error, "ERROR in RabbitMQ - Read: Connection is NULL" );
return false;
}
bool bNoSpace = false;
bool bDone = false;
time_t timeout = aTimeout.Value();
while( !bDone )
{
amqp_frame_t frame;
int result;
amqp_maybe_release_buffers( m_amqpConn );
amqp_rpc_reply_t reply = amqp_basic_get( m_amqpConn, m_channel, amqp_cstring_bytes( m_name.c_str() ), TRUE );
if( CheckForError( reply, "Basic get" ) )
return false;
if( reply.reply.id == AMQP_BASIC_GET_EMPTY_METHOD )
{
//No message was present
if ( timeout == TIMEOUT_NO_WAIT )
return false;
if ( timeout == TIMEOUT_INFINITE )
{
//Infinite timeout
SleepFor( Seconds(5) );
}
else
{
time_t timeToSleep = Seconds(1).ToMillisecs();
if ( timeout < timeToSleep )
timeToSleep = timeout;
SleepFor( timeToSleep );
timeout -= timeToSleep;
}
continue;
}
result = amqp_simple_wait_frame( m_amqpConn, &frame );
if( CheckForError( result, "Simple wait (Header)" ) )
return false;
if( frame.frame_type != AMQP_FRAME_HEADER )
{
LogMessage( enLogLevel_Error, "ERROR in RabbitMQ - Read: Expected header!" );
return false;
}
size_t bodySize = frame.payload.properties.body_size;
size_t bodyReceived = 0;
while( bodyReceived < bodySize )
{
result = amqp_simple_wait_frame( m_amqpConn, &frame );
if( CheckForError( result, "Simple wait (Body)" ) )
return false;
if (frame.frame_type != AMQP_FRAME_BODY)
{
LogMessage( enLogLevel_Error, "ERROR in RabbitMQ - Read: Expected body!" );
return false;
}
size_t fragmentSize = frame.payload.body_fragment.len;
size_t readSize = ( aReadLen + fragmentSize < aMaxLen ) ? fragmentSize : aMaxLen - aReadLen;
bNoSpace = ( readSize < fragmentSize );
memcpy( &apBuf[aReadLen], frame.payload.body_fragment.bytes, readSize );
bodyReceived += fragmentSize;
aReadLen += readSize;
}
bDone = true;
}
return !bNoSpace;
}
bool CvRabbitMq::Write( const uint8_t* apData, size_t aLen )
{
amqp_basic_properties_t props;
memset(&props, 0, sizeof props);
props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
props.delivery_mode = 2; // persistent delivery mode
// props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
// props.content_type = amqp_cstring_bytes("text/plain");
amqp_bytes_t message_bytes = { aLen, (void*)apData };
int result = amqp_basic_publish( m_amqpConn, m_channel,
amqp_cstring_bytes(""),
amqp_cstring_bytes( m_name.c_str() ),
FALSE,
FALSE,
&props,
message_bytes );
if ( CheckForError( result, "Publishing" ) )
return false;
return true;
}
bool CvRabbitMq::CheckForError( int aRetVal, const char* aContext )
{
if ( aRetVal >= 0 )
return false;
char* errstr = amqp_error_string(-aRetVal);
LogMessage( enLogLevel_Error, "ERROR in RabbitMQ - %s: %s", aContext, errstr );
free(errstr);
return true;
}
bool CvRabbitMq::CheckForError( amqp_rpc_reply_t& aReply, const char* aContext )
{
switch( aReply.reply_type )
{
case AMQP_RESPONSE_NORMAL:
return false;
case AMQP_RESPONSE_NONE:
LogMessage( enLogLevel_Error, "ERROR in RabbitMQ - %s: missing RPC reply type!", aContext );
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
{
char* errstr = amqp_error_string( aReply.library_error );
LogMessage( enLogLevel_Error, "ERROR in RabbitMQ - %s: %s", aContext, errstr );
free( errstr );
}
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch( aReply.reply.id )
{
case AMQP_CONNECTION_CLOSE_METHOD:
{
amqp_connection_close_t* m = (amqp_connection_close_t*)aReply.reply.decoded;
LogMessage( enLogLevel_Error, "ERROR in RabbitMQ - %s: server connection error %d, message: %.*s",
aContext, m->reply_code, (int)m->reply_text.len, (char*)m->reply_text.bytes );
}
break;
case AMQP_CHANNEL_CLOSE_METHOD:
{
amqp_channel_close_t* m = (amqp_channel_close_t*)aReply.reply.decoded;
LogMessage( enLogLevel_Error, "ERROR in RabbitMQ - %s: server channel error %d, message: %.*s",
aContext, m->reply_code, (int)m->reply_text.len, (char*)m->reply_text.bytes);
}
break;
default:
LogMessage( enLogLevel_Error, "ERROR in RabbitMQ - %s: unknown server error, method id 0x%08X",
aContext, aReply.reply.id );
break;
}
break;
}
return true;
}
bool CvRabbitMq::CheckForError( amqp_connection_state_t& aConn, const char* aContext )
{
amqp_rpc_reply_t reply = amqp_get_rpc_reply( aConn );
return CheckForError( reply, aContext );
}