| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| using System; |
| using System.Collections.Generic; |
| using System.IO; |
| using System.ServiceModel; |
| using System.ServiceModel.Channels; |
| using System.Text; |
| using System.Threading; |
| using org.apache.qpid.client; |
| using org.apache.qpid.transport; |
| using org.apache.qpid.transport.util; |
| |
| namespace org.apache.qpid.wcf.model |
| { |
| internal sealed class QpidInputChannel : QpidInputChannelBase |
| { |
| private static readonly Logger _log = Logger.get(typeof (QpidInputChannel)); |
| |
| private readonly QpidTransportBindingElement _bindingElement; |
| private readonly MessageEncoder _encoder; |
| private readonly ClientSession _session; |
| private readonly string _queueName; |
| private BlockingQueue _queue; |
| private bool _closed = false; |
| |
| public QpidInputChannel(BindingContext context, ClientSession session, EndpointAddress address) |
| : base(context, address) |
| { |
| _bindingElement = context.Binding.Elements.Find<QpidTransportBindingElement>(); |
| var encoderElem = context.BindingParameters.Find<MessageEncodingBindingElement>(); |
| if (encoderElem != null) |
| { |
| _encoder = encoderElem.CreateMessageEncoderFactory().Encoder; |
| } |
| _session = session; |
| _queueName = address.Uri.ToString(); |
| _queue = new BlockingQueue(); |
| } |
| |
| |
| public override System.ServiceModel.Channels.Message Receive(TimeSpan timeout) |
| { |
| _session.messageFlow("myDest", MessageCreditUnit.MESSAGE, 1); |
| _session.sync(); |
| IMessage m = _queue.Dequeue(); |
| System.ServiceModel.Channels.Message result = null; |
| if (m != null) |
| { |
| var reader = new BinaryReader(m.Body, Encoding.UTF8); |
| var body = new byte[m.Body.Length - m.Body.Position]; |
| reader.Read(body, 0, body.Length); |
| try |
| { |
| result = _encoder.ReadMessage(new MemoryStream(body), |
| (int) _bindingElement.MaxReceivedMessageSize); |
| } |
| catch(Exception e) |
| { |
| Console.WriteLine(e.StackTrace); |
| } |
| result.Headers.To = LocalAddress.Uri; |
| |
| var ack = new RangeSet(); |
| // ack this message |
| ack.add(m.Id); |
| _session.messageAccept(ack); |
| _session.sync(); |
| } |
| else |
| { |
| if(! _closed ) |
| { |
| return Receive(timeout); |
| } |
| } |
| return result; |
| } |
| |
| public override bool TryReceive(TimeSpan timeout, out System.ServiceModel.Channels.Message message) |
| { |
| message = Receive(timeout); |
| return message != null; |
| } |
| |
| public override bool WaitForMessage(TimeSpan timeout) |
| { |
| throw new NotImplementedException(); |
| } |
| |
| public override void Close(TimeSpan timeout) |
| { |
| _closed = true; |
| _queue = null; |
| } |
| |
| public override void Open(TimeSpan timeout) |
| { |
| if (State != CommunicationState.Created && State != CommunicationState.Closed) |
| throw new InvalidOperationException(string.Format("Cannot open the channel from the {0} state.", State)); |
| |
| OnOpening(); |
| |
| var qr = (QueueQueryResult) _session.queueQuery(_queueName).Result; |
| if (qr.getQueue() == null) |
| { |
| // create the queue |
| _session.queueDeclare(_queueName, null, null); |
| } |
| // bind the queue |
| _session.exchangeBind(_queueName, "amq.direct", _queueName, null); |
| var myListener = new WCFListener(_queue); |
| _session.attachMessageListener(myListener, "myDest"); |
| _session.messageSubscribe(_queueName, "myDest", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, |
| null, |
| 0, null); |
| // issue credits |
| _session.messageSetFlowMode("myDest", MessageFlowMode.WINDOW); |
| _session.messageFlow("myDest", MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES); |
| _session.sync(); |
| |
| OnOpened(); |
| } |
| } |
| |
| internal class WCFListener : IMessageListener |
| { |
| private static readonly Logger _log = Logger.get(typeof (WCFListener)); |
| private readonly BlockingQueue _q; |
| |
| public WCFListener(BlockingQueue q) |
| { |
| _q = q; |
| } |
| |
| public void messageTransfer(IMessage m) |
| { |
| _log.debug("message received by listener"); |
| _q.Enqueue(m); |
| } |
| } |
| |
| internal class BlockingQueue |
| { |
| private int _count; |
| private readonly Queue<IMessage> _queue = new Queue<IMessage>(); |
| |
| public IMessage Dequeue(TimeSpan timeout) |
| { |
| lock (_queue) |
| { |
| DateTime start = DateTime.Now; |
| long elapsed = 0; |
| while (_count <= 0 && elapsed < timeout.Milliseconds) |
| { |
| Monitor.Wait(_queue, new TimeSpan(timeout.Milliseconds - elapsed)); |
| elapsed = DateTime.Now.Subtract(start).Milliseconds; |
| } |
| if (_count > 0) |
| { |
| _count--; |
| return _queue.Dequeue(); |
| } |
| return null; |
| } |
| } |
| |
| public IMessage Dequeue() |
| { |
| lock (_queue) |
| { |
| while (_count <= 0) |
| { |
| Monitor.Wait(_queue); |
| } |
| if (_count > 0) |
| { |
| _count--; |
| return _queue.Dequeue(); |
| } |
| return null; |
| } |
| } |
| |
| public void Enqueue(IMessage data) |
| { |
| if (data != null) |
| { |
| lock (_queue) |
| { |
| _queue.Enqueue(data); |
| _count++; |
| Monitor.Pulse(_queue); |
| } |
| } |
| } |
| } |
| } |
| |