/* | |
* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
* | |
*/ | |
using System; | |
using System.Collections.Generic; | |
using System.Collections.ObjectModel; | |
using Org.Apache.Qpid.Messaging; | |
using Org.Apache.Qpid.Messaging.SessionReceiver; | |
namespace Org.Apache.Qpid.Messaging.Examples | |
{ | |
/// <summary> | |
/// A class with functions to display structured messages. | |
/// </summary> | |
public static class MessageViewer | |
{ | |
/// <summary> | |
/// A Function to display a amqp/map message packaged as a Dictionary. | |
/// </summary> | |
/// <param name="dict">The AMQP map</param> | |
/// <param name="level">Nested depth</param> | |
public static void ShowDictionary(Dictionary<string, object> dict, int level) | |
{ | |
foreach (KeyValuePair<string, object> kvp in dict) | |
{ | |
Console.Write(new string(' ', level * 4)); | |
if (QpidTypeCheck.ObjectIsMap(kvp.Value)) | |
{ | |
Console.WriteLine("Key: {0}, Value: Dictionary", kvp.Key); | |
ShowDictionary((Dictionary<string, object>)kvp.Value, level + 1); | |
} | |
else if (QpidTypeCheck.ObjectIsList(kvp.Value)) | |
{ | |
Console.WriteLine("Key: {0}, Value: List", kvp.Key); | |
ShowList((Collection<object>)kvp.Value, level + 1); | |
} | |
else | |
Console.WriteLine("Key: {0}, Value: {1}, Type: {2}", | |
kvp.Key, kvp.Value, kvp.Value.GetType().ToString()); | |
} | |
} | |
/// <summary> | |
/// A function to display a ampq/list message packaged as a List. | |
/// </summary> | |
/// <param name="list">The AMQP list</param> | |
/// <param name="level">Nested depth</param> | |
public static void ShowList(Collection<object> list, int level) | |
{ | |
foreach (object obj in list) | |
{ | |
Console.Write(new string(' ', level * 4)); | |
if (QpidTypeCheck.ObjectIsMap(obj)) | |
{ | |
Console.WriteLine("Dictionary"); | |
ShowDictionary((Dictionary<string, object>)obj, level + 1); | |
} | |
else if (QpidTypeCheck.ObjectIsList(obj)) | |
{ | |
Console.WriteLine("List"); | |
ShowList((Collection<object>)obj, level + 1); | |
} | |
else | |
Console.WriteLine("Value: {0}, Type: {1}", | |
obj.ToString(), obj.GetType().ToString()); | |
} | |
} | |
/// <summary> | |
/// A function to diplay a Message. The native Object type is | |
/// decomposed into AMQP types. | |
/// </summary> | |
/// <param name="message">The Message</param> | |
public static void ShowMessage(Message message) | |
{ | |
if ("amqp/map" == message.ContentType) | |
{ | |
Console.WriteLine("Received a Dictionary"); | |
Dictionary<string, object> content = new Dictionary<string, object>(); | |
message.GetContent(content); | |
ShowDictionary(content, 0); | |
} | |
else if ("amqp/list" == message.ContentType) | |
{ | |
Console.WriteLine("Received a List"); | |
Collection<object> content = new Collection<object>(); | |
message.GetContent(content); | |
ShowList(content, 0); | |
} | |
else | |
{ | |
Console.WriteLine("Received a String"); | |
Console.WriteLine(message.GetContent()); | |
} | |
} | |
} | |
/// <summary> | |
/// A model class to demonstrate how a user may use the Qpid Messaging | |
/// interface to receive Session messages using a callback. | |
/// </summary> | |
class ReceiverProcess : ISessionReceiver | |
{ | |
UInt32 messagesReceived = 0; | |
/// <summary> | |
/// SessionReceiver implements the ISessionReceiver interface. | |
/// It is the callback function that receives all messages for a Session. | |
/// It may be called any time server is running. | |
/// It is always called on server's private thread. | |
/// </summary> | |
/// <param name="receiver">The Receiver associated with the message.</param> | |
/// <param name="message">The Message</param> | |
public void SessionReceiver(Receiver receiver, Message message) | |
{ | |
// | |
// Indicate message reception | |
// | |
Console.WriteLine("--- Message {0}", ++messagesReceived); | |
// | |
// Display the received message | |
// | |
MessageViewer.ShowMessage(message); | |
// | |
// Acknowledge the receipt of all received messages. | |
// | |
receiver.Session.Acknowledge(); | |
} | |
/// <summary> | |
/// SessionReceiver implements the ISessionReceiver interface. | |
/// It is the exception function that receives all exception messages | |
/// It may be called any time server is running. | |
/// It is always called on server's private thread. | |
/// After this is called then the sessionReceiver and private thread are closed. | |
/// </summary> | |
/// <param name="exception">The exception.</param> | |
public void SessionException(Exception exception) | |
{ | |
// A typical application will take more action here. | |
Console.WriteLine("{0} Exception caught.", exception.ToString()); | |
} | |
/// <summary> | |
/// Usage | |
/// </summary> | |
/// <param name="url">Connection target</param> | |
/// <param name="addr">Address: broker exchange + routing key</param> | |
/// <param name="nSec">n seconds to keep callback open</param> | |
static void usage(string url, string addr, int nSec) | |
{ | |
Console.WriteLine("usage: {0} [url [addr [nSec]]]", | |
System.Diagnostics.Process.GetCurrentProcess().ProcessName); | |
Console.WriteLine(); | |
Console.WriteLine("A program to connect to a broker and receive"); | |
Console.WriteLine("messages from a named exchange with a routing key."); | |
Console.WriteLine("The receiver uses a session callback and keeps the callback"); | |
Console.WriteLine("server open for so many seconds."); | |
Console.WriteLine("The details of the message body's types and values are shown."); | |
Console.WriteLine(); | |
Console.WriteLine(" url = target address for 'new Connection(url)'"); | |
Console.WriteLine(" addr = address for 'session.CreateReceiver(addr)'"); | |
Console.WriteLine(" nSec = time in seconds to keep the receiver callback open"); | |
Console.WriteLine(); | |
Console.WriteLine("Default values:"); | |
Console.WriteLine(" {0} {1} {2} {3}", | |
System.Diagnostics.Process.GetCurrentProcess().ProcessName, | |
url, addr, nSec); | |
} | |
/// <summary> | |
/// A function to illustrate how to open a Session callback and | |
/// receive messages. | |
/// </summary> | |
/// <param name="args">Main program arguments</param> | |
public int TestProgram(string[] args) | |
{ | |
string url = "amqp:tcp:localhost:5672"; | |
string addr = "amq.direct/map_example"; | |
int nSec = 30; | |
string connectionOptions = ""; | |
if (1 == args.Length) | |
{ | |
if (args[0].Equals("-h") || args[0].Equals("-H") || args[0].Equals("/?")) | |
{ | |
usage(url, addr, nSec); | |
return 1; | |
} | |
} | |
if (args.Length > 0) | |
url = args[0]; | |
if (args.Length > 1) | |
addr = args[1]; | |
if (args.Length > 2) | |
nSec = System.Convert.ToInt32(args[2]); | |
if (args.Length > 3) | |
connectionOptions = args[3]; | |
// | |
// Create and open an AMQP connection to the broker URL | |
// | |
Connection connection = new Connection(url, connectionOptions); | |
connection.Open(); | |
// | |
// Create a session. | |
// | |
Session session = connection.CreateSession(); | |
// | |
// Receive through callback | |
// | |
// Create callback server and implicitly start it | |
// | |
SessionReceiver.CallbackServer cbServer = | |
new SessionReceiver.CallbackServer(session, this); | |
// | |
// The callback server is running and executing callbacks on a | |
// separate thread. | |
// | |
// | |
// Create a receiver for the direct exchange using the | |
// routing key "map_example". | |
// | |
Receiver receiver = session.CreateReceiver(addr); | |
// | |
// Establish a capacity | |
// | |
receiver.Capacity = 100; | |
// | |
// Wait so many seconds for messages to arrive. | |
// | |
System.Threading.Thread.Sleep(nSec * 1000); // in mS | |
// | |
// Stop the callback server. | |
// | |
cbServer.Close(); | |
// | |
// Close the receiver and the connection. | |
// | |
try | |
{ | |
receiver.Close(); | |
connection.Close(); | |
} | |
catch (Exception exception) | |
{ | |
// receiver or connection may throw if they closed in error. | |
// A typical application will take more action here. | |
Console.WriteLine("{0} Closing exception caught.", exception.ToString()); | |
} | |
return 0; | |
} | |
} | |
class MapCallbackReceiverMain | |
{ | |
/// <summary> | |
/// Main program | |
/// </summary> | |
/// <param name="args">Main prgram args</param> | |
static int Main(string[] args) | |
{ | |
// Invoke 'TestProgram' as non-static class. | |
ReceiverProcess mainProc = new ReceiverProcess(); | |
int result = mainProc.TestProgram(args); | |
return result; | |
} | |
} | |
} | |