| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| using System; |
| using log4net; |
| using Apache.Qpid.Messaging; |
| using Apache.Qpid.Client.Qms; |
| |
| namespace Apache.Qpid.Client.Tests.interop |
| { |
| public class TopicListener |
| { |
| private static ILog log = LogManager.GetLogger(typeof(TopicListener)); |
| |
| /// <summary> The default AMQ connection URL to use for tests. </summary> |
| const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'"; |
| |
| /// <summary> Holds the routing key for the topic to receive test messages on. </summary> |
| public static string CONTROL_ROUTING_KEY = "topic_control"; |
| |
| /// <summary> Holds the routing key for the queue to send reports to. </summary> |
| public static string RESPONSE_ROUTING_KEY = "response"; |
| |
| /// <summary> Holds the connection to listen on. </summary> |
| private IConnection connection; |
| |
| /// <summary> Holds the channel for all test messages.</summary> |
| private IChannel channel; |
| |
| /// <summary> Holds the producer to send report messages on. </summary> |
| private IMessagePublisher publisher; |
| |
| /// <summary> Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. </summary> */ |
| private bool init; |
| |
| /// <summary> Holds the count of messages received by this listener. </summary> */ |
| private int count; |
| |
| /// <summary> Creates a topic listener using the specified broker URL. </summary> |
| /// |
| /// <param name="connectionUri">The broker URL to listen on.</param> |
| TopicListener(string connectionUri) |
| { |
| log.Debug("TopicListener(string connectionUri = " + connectionUri + "): called"); |
| |
| // Create a connection to the broker. |
| IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri); |
| connection = new AMQConnection(connectionInfo); |
| |
| // Establish a session on the broker. |
| channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1); |
| |
| // Set up a queue to listen for test messages on. |
| string topicQueueName = channel.GenerateUniqueName(); |
| channel.DeclareQueue(topicQueueName, false, true, true); |
| |
| // Set this listener up to listen for incoming messages on the test topic queue. |
| channel.Bind(topicQueueName, ExchangeNameDefaults.TOPIC, CONTROL_ROUTING_KEY); |
| IMessageConsumer consumer = channel.CreateConsumerBuilder(topicQueueName) |
| .Create(); |
| consumer.OnMessage += new MessageReceivedDelegate(OnMessage); |
| |
| // Set up this listener with a producer to send the reports on. |
| publisher = channel.CreatePublisherBuilder() |
| .WithExchangeName(ExchangeNameDefaults.DIRECT) |
| .WithRoutingKey(RESPONSE_ROUTING_KEY) |
| .Create(); |
| |
| connection.Start(); |
| Console.WriteLine("Waiting for messages..."); |
| } |
| |
| public static void Main(String[] argv) |
| { |
| // Create an instance of this listener with the command line parameters. |
| new TopicListener(DEFAULT_URI); |
| } |
| |
| /// <summary> |
| /// Handles all message received by this listener. Test messages are counted, report messages result in a report being sent and |
| /// shutdown messages result in this listener being terminated. |
| /// </summary> |
| /// |
| /// <param name="message">The received message.</param> |
| public void OnMessage(IMessage message) |
| { |
| log.Debug("public void onMessage(Message message = " + message + "): called"); |
| |
| // Take the start time of the first message if this is the first message. |
| if (!init) |
| { |
| count = 0; |
| init = true; |
| } |
| |
| // Check if the message is a control message telling this listener to shut down. |
| if (IsShutdown(message)) |
| { |
| log.Debug("Got a shutdown message."); |
| Shutdown(); |
| } |
| // Check if the message is a report request message asking this listener to respond with the message count. |
| else if (IsReport(message)) |
| { |
| log.Debug("Got a report request message."); |
| |
| // Send the message count report. |
| SendReport(); |
| |
| // Reset the initialization flag so that the next message is considered to be the first. |
| init = false; |
| } |
| // Otherwise it is an ordinary test message, so increment the message count. |
| else |
| { |
| count++; |
| } |
| } |
| |
| /// <summary> Checks a message to see if it is a shutdown control message. </summary> |
| /// |
| /// <param name="m">The message to check.</param> |
| /// |
| /// <returns><tt>true</tt> if it is a shutdown control message, <tt>false</tt> otherwise.</returns> |
| private bool IsShutdown(IMessage m) |
| { |
| bool result = CheckTextField(m, "TYPE", "TERMINATION_REQUEST"); |
| |
| //log.Debug("isShutdown = " + result); |
| |
| return result; |
| } |
| |
| /// <summary> Checks a message to see if it is a report request control message. </summary> |
| /// |
| /// <param name="m">The message to check.</param> |
| /// |
| /// <returns><tt>true</tt> if it is a report request control message, <tt>false</tt> otherwise.</returns> |
| private bool IsReport(IMessage m) |
| { |
| bool result = CheckTextField(m, "TYPE", "REPORT_REQUEST"); |
| |
| //log.Debug("isReport = " + result); |
| |
| return result; |
| } |
| |
| /// <summary> Checks whether or not a text field on a message has the specified value. </summary> |
| /// |
| /// <param name="e">The message to check.</param> |
| /// <param name="e">The name of the field to check.</param> |
| /// <param name="e">The expected value of the field to compare with.</param> |
| /// |
| /// <returns> <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise. </returns> |
| private static bool CheckTextField(IMessage m, string fieldName, string value) |
| { |
| /*log.Debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName |
| + ", String value = " + value + "): called");*/ |
| |
| string comp = m.Headers.GetString(fieldName); |
| |
| return (comp != null) && comp == value; |
| } |
| |
| /// <summary> Stops the message consumer and closes the connection. </summary> |
| private void Shutdown() |
| { |
| connection.Stop(); |
| channel.Dispose(); |
| connection.Dispose(); |
| } |
| |
| /// <summary> Sends the report message to the response location. </summary> |
| private void SendReport() |
| { |
| string report = "Received " + count + "."; |
| |
| IMessage reportMessage = channel.CreateTextMessage(report); |
| |
| reportMessage.Headers.SetBoolean("BOOLEAN", false); |
| //reportMessage.Headers.SetByte("BYTE", 5); |
| reportMessage.Headers.SetDouble("DOUBLE", 3.141); |
| reportMessage.Headers.SetFloat("FLOAT", 1.0f); |
| reportMessage.Headers.SetInt("INT", 1); |
| reportMessage.Headers.SetLong("LONG", 1); |
| reportMessage.Headers.SetString("STRING", "hello"); |
| reportMessage.Headers.SetShort("SHORT", 2); |
| |
| publisher.Send(reportMessage); |
| |
| Console.WriteLine("Sent report: " + report); |
| } |
| } |
| } |