| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| using System; |
| using System.Text; |
| using System.Threading; |
| using log4net; |
| using NUnit.Framework; |
| using Apache.Qpid.Messaging; |
| |
| namespace Apache.Qpid.Client.Tests |
| { |
| [TestFixture] |
| public class ProducerMultiConsumer : BaseMessagingTestFixture |
| { |
| private static readonly ILog _logger = LogManager.GetLogger(typeof(ProducerMultiConsumer)); |
| |
| private string _commandQueueName = "ServiceQ1"; |
| |
| private const int CONSUMER_COUNT = 5; |
| |
| private const int MESSAGE_COUNT = 1000; |
| |
| private const string MESSAGE_DATA_BYTES = "****jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk "; |
| |
| AutoResetEvent _finishedEvent = new AutoResetEvent(false); |
| |
| private static String GetData(int size) |
| { |
| StringBuilder buf = new StringBuilder(size); |
| int count = 0; |
| while (count < size + MESSAGE_DATA_BYTES.Length) |
| { |
| buf.Append(MESSAGE_DATA_BYTES); |
| count += MESSAGE_DATA_BYTES.Length; |
| } |
| if (count < size) |
| { |
| buf.Append(MESSAGE_DATA_BYTES, 0, size - count); |
| } |
| |
| return buf.ToString(); |
| } |
| |
| private IMessagePublisher _publisher; |
| |
| private IMessageConsumer[] _consumers = new IMessageConsumer[CONSUMER_COUNT]; |
| |
| private int _messageReceivedCount = 0; |
| |
| [SetUp] |
| public override void Init() |
| { |
| base.Init(); |
| _publisher = _channel.CreatePublisherBuilder() |
| .WithRoutingKey(_commandQueueName) |
| .WithExchangeName(ExchangeNameDefaults.TOPIC) |
| .Create(); |
| |
| _publisher.DisableMessageTimestamp = true; |
| _publisher.DeliveryMode = DeliveryMode.NonPersistent; |
| |
| for (int i = 0; i < CONSUMER_COUNT; i++) |
| { |
| string queueName = _channel.GenerateUniqueName(); |
| _channel.DeclareQueue(queueName, false, true, true); |
| |
| _channel.Bind(queueName, ExchangeNameDefaults.TOPIC, _commandQueueName); |
| |
| _consumers[i] = _channel.CreateConsumerBuilder(queueName) |
| .WithPrefetchLow(100).Create(); |
| _consumers[i].OnMessage = new MessageReceivedDelegate(OnMessage); |
| } |
| _connection.Start(); |
| } |
| |
| public void OnMessage(IMessage m) |
| { |
| int newCount = Interlocked.Increment(ref _messageReceivedCount); |
| if (newCount % 1000 == 0) _logger.Info("Received count=" + newCount); |
| if (newCount == (MESSAGE_COUNT * CONSUMER_COUNT)) |
| { |
| _logger.Info("All messages received"); |
| _finishedEvent.Set(); |
| } |
| if ( newCount % 100 == 0 ) |
| System.Diagnostics.Debug.WriteLine(((ITextMessage)m).Text); |
| } |
| |
| [Test] |
| public void RunTest() |
| { |
| for (int i = 0; i < MESSAGE_COUNT; i++) |
| { |
| ITextMessage msg; |
| try |
| { |
| msg = _channel.CreateTextMessage(GetData(512 + 8*i)); |
| } |
| catch (Exception e) |
| { |
| _logger.Error("Error creating message: " + e, e); |
| break; |
| } |
| _publisher.Send(msg); |
| } |
| _finishedEvent.WaitOne(); |
| } |
| } |
| } |