blob: b355abb28d5703e7ccc0fd5b46efea3fcc91f1f2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using log4net;
using Apache.Qpid.Messaging;
using Apache.Qpid.Client.Qms;
namespace Apache.Qpid.Client.Tests.interop
{
public class TopicListener
{
private static ILog log = LogManager.GetLogger(typeof(TopicListener));
/// <summary> The default AMQ connection URL to use for tests. </summary>
const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
/// <summary> Holds the routing key for the topic to receive test messages on. </summary>
public static string CONTROL_ROUTING_KEY = "topic_control";
/// <summary> Holds the routing key for the queue to send reports to. </summary>
public static string RESPONSE_ROUTING_KEY = "response";
/// <summary> Holds the connection to listen on. </summary>
private IConnection connection;
/// <summary> Holds the channel for all test messages.</summary>
private IChannel channel;
/// <summary> Holds the producer to send report messages on. </summary>
private IMessagePublisher publisher;
/// <summary> Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. </summary> */
private bool init;
/// <summary> Holds the count of messages received by this listener. </summary> */
private int count;
/// <summary> Creates a topic listener using the specified broker URL. </summary>
///
/// <param name="connectionUri">The broker URL to listen on.</param>
TopicListener(string connectionUri)
{
log.Debug("TopicListener(string connectionUri = " + connectionUri + "): called");
// Create a connection to the broker.
IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
connection = new AMQConnection(connectionInfo);
// Establish a session on the broker.
channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
// Set up a queue to listen for test messages on.
string topicQueueName = channel.GenerateUniqueName();
channel.DeclareQueue(topicQueueName, false, true, true);
// Set this listener up to listen for incoming messages on the test topic queue.
channel.Bind(topicQueueName, ExchangeNameDefaults.TOPIC, CONTROL_ROUTING_KEY);
IMessageConsumer consumer = channel.CreateConsumerBuilder(topicQueueName)
.Create();
consumer.OnMessage += new MessageReceivedDelegate(OnMessage);
// Set up this listener with a producer to send the reports on.
publisher = channel.CreatePublisherBuilder()
.WithExchangeName(ExchangeNameDefaults.DIRECT)
.WithRoutingKey(RESPONSE_ROUTING_KEY)
.Create();
connection.Start();
Console.WriteLine("Waiting for messages...");
}
public static void Main(String[] argv)
{
// Create an instance of this listener with the command line parameters.
new TopicListener(DEFAULT_URI);
}
/// <summary>
/// Handles all message received by this listener. Test messages are counted, report messages result in a report being sent and
/// shutdown messages result in this listener being terminated.
/// </summary>
///
/// <param name="message">The received message.</param>
public void OnMessage(IMessage message)
{
log.Debug("public void onMessage(Message message = " + message + "): called");
// Take the start time of the first message if this is the first message.
if (!init)
{
count = 0;
init = true;
}
// Check if the message is a control message telling this listener to shut down.
if (IsShutdown(message))
{
log.Debug("Got a shutdown message.");
Shutdown();
}
// Check if the message is a report request message asking this listener to respond with the message count.
else if (IsReport(message))
{
log.Debug("Got a report request message.");
// Send the message count report.
SendReport();
// Reset the initialization flag so that the next message is considered to be the first.
init = false;
}
// Otherwise it is an ordinary test message, so increment the message count.
else
{
count++;
}
}
/// <summary> Checks a message to see if it is a shutdown control message. </summary>
///
/// <param name="m">The message to check.</param>
///
/// <returns><tt>true</tt> if it is a shutdown control message, <tt>false</tt> otherwise.</returns>
private bool IsShutdown(IMessage m)
{
bool result = CheckTextField(m, "TYPE", "TERMINATION_REQUEST");
//log.Debug("isShutdown = " + result);
return result;
}
/// <summary> Checks a message to see if it is a report request control message. </summary>
///
/// <param name="m">The message to check.</param>
///
/// <returns><tt>true</tt> if it is a report request control message, <tt>false</tt> otherwise.</returns>
private bool IsReport(IMessage m)
{
bool result = CheckTextField(m, "TYPE", "REPORT_REQUEST");
//log.Debug("isReport = " + result);
return result;
}
/// <summary> Checks whether or not a text field on a message has the specified value. </summary>
///
/// <param name="e">The message to check.</param>
/// <param name="e">The name of the field to check.</param>
/// <param name="e">The expected value of the field to compare with.</param>
///
/// <returns> <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise. </returns>
private static bool CheckTextField(IMessage m, string fieldName, string value)
{
/*log.Debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName
+ ", String value = " + value + "): called");*/
string comp = m.Headers.GetString(fieldName);
return (comp != null) && comp == value;
}
/// <summary> Stops the message consumer and closes the connection. </summary>
private void Shutdown()
{
connection.Stop();
channel.Dispose();
connection.Dispose();
}
/// <summary> Sends the report message to the response location. </summary>
private void SendReport()
{
string report = "Received " + count + ".";
IMessage reportMessage = channel.CreateTextMessage(report);
reportMessage.Headers.SetBoolean("BOOLEAN", false);
//reportMessage.Headers.SetByte("BYTE", 5);
reportMessage.Headers.SetDouble("DOUBLE", 3.141);
reportMessage.Headers.SetFloat("FLOAT", 1.0f);
reportMessage.Headers.SetInt("INT", 1);
reportMessage.Headers.SetLong("LONG", 1);
reportMessage.Headers.SetString("STRING", "hello");
reportMessage.Headers.SetShort("SHORT", 2);
publisher.Send(reportMessage);
Console.WriteLine("Sent report: " + report);
}
}
}