/* | |
* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
* | |
*/ | |
using System; | |
using System.Threading; | |
using log4net; | |
using NUnit.Framework; | |
using Apache.Qpid.Messaging; | |
namespace Apache.Qpid.Integration.Tests.testcases | |
{ | |
[TestFixture, Category("Integration")] | |
public class ServiceProvidingClient : BaseMessagingTestFixture | |
{ | |
private static ILog _logger = LogManager.GetLogger(typeof(ServiceProvidingClient)); | |
private int _messageCount; | |
private string _replyToExchangeName; | |
private string _replyToRoutingKey; | |
const int PACK = 100; | |
private IMessagePublisher _destinationPublisher; | |
private IMessageConsumer _consumer; | |
private string _serviceName = "ServiceQ1"; | |
private string _selector = null; | |
[SetUp] | |
public override void Init() | |
{ | |
base.Init(); | |
_logger.Info("Starting..."); | |
_logger.Info("Service (queue) name is '" + _serviceName + "'..."); | |
_connection.ExceptionListener = new ExceptionListenerDelegate(OnConnectionException); | |
_logger.Info("Message selector is <" + _selector + ">..."); | |
_channel.DeclareQueue(_serviceName, false, false, false); | |
_consumer = _channel.CreateConsumerBuilder(_serviceName) | |
.WithPrefetchLow(100) | |
.WithPrefetchHigh(500) | |
.WithNoLocal(true) | |
.Create(); | |
_consumer.OnMessage = new MessageReceivedDelegate(OnMessage); | |
} | |
public override void Shutdown() | |
{ | |
_consumer.Dispose(); | |
base.Shutdown(); | |
} | |
private void OnConnectionException(Exception e) | |
{ | |
_logger.Info("Connection exception occurred", e); | |
// XXX: Test still doesn't shutdown when broker terminates. Is there no heartbeat? | |
} | |
[Test] | |
public void Test() | |
{ | |
_connection.Start(); | |
_logger.Info("Waiting..."); | |
ServiceRequestingClient client = new ServiceRequestingClient(); | |
client.Init(); | |
client.SendMessages(); | |
} | |
private void OnMessage(IMessage message) | |
{ | |
// _logger.Info("Got message '" + message + "'"); | |
ITextMessage tm = (ITextMessage)message; | |
try | |
{ | |
string replyToExchangeName = tm.ReplyToExchangeName; | |
string replyToRoutingKey = tm.ReplyToRoutingKey; | |
_replyToExchangeName = replyToExchangeName; | |
_replyToRoutingKey = replyToRoutingKey; | |
_logger.Debug("About to create a producer"); | |
// Console.WriteLine("ReplyTo.ExchangeName = " + _replyToExchangeName); | |
// Console.WriteLine("ReplyTo.RoutingKey = " + _replyToRoutingKey); | |
_destinationPublisher = _channel.CreatePublisherBuilder() | |
.WithExchangeName(_replyToExchangeName) | |
.WithRoutingKey(_replyToRoutingKey) | |
.WithDeliveryMode(DeliveryMode.NonPersistent) | |
.Create(); | |
_destinationPublisher.DisableMessageTimestamp = true; | |
_logger.Debug("After create a producer"); | |
} | |
catch (QpidException e) | |
{ | |
_logger.Error("Error creating destination", e); | |
throw e; | |
} | |
_messageCount++; | |
if (_messageCount % PACK == 0) | |
{ | |
_logger.Info("Received message total: " + _messageCount); | |
_logger.Info(string.Format("Sending response to '{0}:{1}'", | |
_replyToExchangeName, _replyToRoutingKey)); | |
} | |
try | |
{ | |
String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.Text; | |
ITextMessage msg = _channel.CreateTextMessage(payload); | |
if ( tm.Headers.Contains("timeSent") ) | |
{ | |
msg.Headers["timeSent"] = tm.Headers["timeSent"]; | |
} | |
_destinationPublisher.Send(msg); | |
} catch ( QpidException e ) | |
{ | |
_logger.Error("Error sending message: " + e, e); | |
throw e; | |
} finally | |
{ | |
_destinationPublisher.Dispose(); | |
} | |
} | |
} | |
} |