/* | |
* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
* | |
*/ | |
using System; | |
using System.Text; | |
using System.Threading; | |
using log4net; | |
using NUnit.Framework; | |
using Apache.Qpid.Messaging; | |
namespace Apache.Qpid.Client.Tests | |
{ | |
[TestFixture] | |
public class TestSyncConsumer : BaseMessagingTestFixture | |
{ | |
private static readonly ILog _logger = LogManager.GetLogger(typeof(TestSyncConsumer)); | |
private string _commandQueueName = "ServiceQ1"; | |
private const int MESSAGE_COUNT = 1000; | |
private const string MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk "; | |
private static String GetData(int size) | |
{ | |
StringBuilder buf = new StringBuilder(size); | |
int count = 0; | |
while ( count < size + MESSAGE_DATA_BYTES.Length ) | |
{ | |
buf.Append(MESSAGE_DATA_BYTES); | |
count += MESSAGE_DATA_BYTES.Length; | |
} | |
if ( count < size ) | |
{ | |
buf.Append(MESSAGE_DATA_BYTES, 0, size - count); | |
} | |
return buf.ToString(); | |
} | |
private IMessageConsumer _consumer; | |
private IMessagePublisher _publisher; | |
[SetUp] | |
public override void Init() | |
{ | |
base.Init(); | |
_publisher = _channel.CreatePublisherBuilder() | |
.WithRoutingKey(_commandQueueName) | |
.WithExchangeName(ExchangeNameDefaults.TOPIC) | |
.Create(); | |
_publisher.DisableMessageTimestamp = true; | |
_publisher.DeliveryMode = DeliveryMode.NonPersistent; | |
string queueName = _channel.GenerateUniqueName(); | |
_channel.DeclareQueue(queueName, false, true, true); | |
_channel.Bind(queueName, ExchangeNameDefaults.TOPIC, _commandQueueName); | |
_consumer = _channel.CreateConsumerBuilder(queueName) | |
.WithPrefetchLow(100).Create(); | |
_connection.Start(); | |
} | |
[Test] | |
public void ReceiveWithInfiniteWait() | |
{ | |
// send all messages | |
for ( int i = 0; i < MESSAGE_COUNT; i++ ) | |
{ | |
ITextMessage msg; | |
try | |
{ | |
msg = _channel.CreateTextMessage(GetData(512 + 8 * i)); | |
} catch ( Exception e ) | |
{ | |
_logger.Error("Error creating message: " + e, e); | |
break; | |
} | |
_publisher.Send(msg); | |
} | |
_logger.Debug("All messages sent"); | |
// receive all messages | |
for ( int i = 0; i < MESSAGE_COUNT; i++ ) | |
{ | |
try | |
{ | |
IMessage msg = _consumer.Receive(); | |
Assert.IsNotNull(msg); | |
} catch ( Exception e ) | |
{ | |
_logger.Error("Error receiving message: " + e, e); | |
Assert.Fail(e.ToString()); | |
} | |
} | |
} | |
[Test] | |
public void ReceiveWithTimeout() | |
{ | |
ITextMessage msg = _channel.CreateTextMessage(GetData(512 + 8)); | |
_publisher.Send(msg); | |
IMessage recvMsg = _consumer.Receive(); | |
Assert.IsNotNull(recvMsg); | |
// empty queue, should timeout | |
Assert.IsNull(_consumer.Receive(1000)); | |
} | |
} | |
} |