blob: 2ce493626f500950b78328fed38ba203432661a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
using System;
using System.IO;
using System.Text;
using System.Threading;
using org.apache.qpid.client;
using org.apache.qpid.transport;
namespace org.apache.qpid.example.requestresponse
{
/// <summary>
/// This program is one of two programs that illustrate the
/// request/response pattern.
///
/// Client:
/// Make requests of a service, print the response.
///
/// Server (this program):
/// Accept requests, set the letters to uppercase in each message, and
/// return it as a response.
///
/// </summary>
class Server
{
static void Main(string[] args)
{
string host = args.Length > 0 ? args[0] : "localhost";
int port = args.Length > 1 ? Convert.ToInt32(args[1]) : 5672;
client.Client connection = new client.Client();
try
{
connection.connect(host, port, "test", "guest", "guest");
ClientSession session = connection.createSession(50000);
//--------- Main body of program --------------------------------------------
// Create a request queue for clients to use when making
// requests.
const string request_queue = "request";
// Use the name of the request queue as the routing key
session.queueDeclare(request_queue);
session.exchangeBind(request_queue, "amq.direct", request_queue);
lock (session)
{
// Create a listener and subscribe it to the request_queue
IMessageListener listener = new MessageListener(session);
session.attachMessageListener(listener, request_queue);
session.messageSubscribe(request_queue);
// Receive messages until all messages are received
Console.WriteLine("Waiting for requests");
Monitor.Wait(session);
}
//---------------------------------------------------------------------------
connection.close();
}
catch (Exception e)
{
Console.WriteLine("Error: \n" + e.StackTrace);
}
}
}
public class MessageListener : IMessageListener
{
private readonly ClientSession _session;
private readonly RangeSet _range = new RangeSet();
public MessageListener(ClientSession session)
{
_session = session;
}
public void messageTransfer(IMessage request)
{
IMessage response = new Message();
// Get routing key for response from the request's replyTo property
string routingKey;
if( request.MessageProperties.hasReplyTo() )
{
routingKey = request.MessageProperties.getReplyTo().getRoutingKey();
}
else
{
Console.WriteLine("Error: \n No routing key for request " + request);
return;
}
BinaryReader reader = new BinaryReader(request.Body, Encoding.UTF8);
byte[] body = new byte[request.Body.Length - request.Body.Position];
reader.Read(body, 0, body.Length);
ASCIIEncoding enc = new ASCIIEncoding();
string message = enc.GetString(body);
Console.WriteLine("Request: " + message);
// Transform message content to upper case
string responseBody = message.ToUpper();
// Send it back to the user
response.clearData();
response.appendData(Encoding.UTF8.GetBytes(responseBody));
_session.messageTransfer("amq.direct", routingKey, response);
// Add this message to the list of message to be acknowledged
_range.add(request.Id);
if (message.Equals("That's all, folks!"))
{
// Acknowledge all the received messages
_session.messageAccept(_range);
lock (_session)
{
Monitor.Pulse(_session);
}
}
}
}
}