| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Collections.Generic; |
| using System.Threading.Tasks; |
| using Apache.Qpid.Proton.Client.Exceptions; |
| using Apache.Qpid.Proton.Client.Concurrent; |
| using Apache.Qpid.Proton.Engine; |
| using Apache.Qpid.Proton.Types.Messaging; |
| using Apache.Qpid.Proton.Logging; |
| |
| namespace Apache.Qpid.Proton.Client.Implementation |
| { |
| /// <summary> |
| /// Implements the stream sender using a stateful current outgoing message that prevents |
| /// any sends other than to the current message until the current is completed. |
| /// </summary> |
| public sealed class ClientStreamSender : IStreamSender |
| { |
| private static IProtonLogger LOG = ProtonLoggerFactory.GetLogger<ClientStreamSender>(); |
| |
| private readonly AtomicBoolean closed = new AtomicBoolean(); |
| private ClientException failureCause; |
| |
| private readonly StreamSenderOptions options; |
| private readonly ClientSession session; |
| private readonly string senderId; |
| private readonly bool sendsSettled; |
| private readonly TaskCompletionSource<IStreamSender> openFuture = new TaskCompletionSource<IStreamSender>(); |
| private readonly TaskCompletionSource<IStreamSender> closeFuture = new TaskCompletionSource<IStreamSender>(); |
| |
| private Engine.ISender protonSender; |
| private Action<ISender> senderRemotelyClosedHandler; |
| |
| private volatile ISource remoteSource; |
| private volatile ITarget remoteTarget; |
| |
| internal ClientStreamSender(ClientSession session, StreamSenderOptions options, string senderId, Engine.ISender protonSender) |
| { |
| this.options = new StreamSenderOptions(options); |
| this.session = session; |
| this.senderId = senderId; |
| this.protonSender = protonSender; |
| this.protonSender.LinkedResource = this; |
| this.sendsSettled = protonSender.SenderSettleMode == Types.Transport.SenderSettleMode.Settled; |
| } |
| |
| public IClient Client => session.Client; |
| |
| public IConnection Connection => session.Connection; |
| |
| public ISession Session => session; |
| |
| public Task<ISender> OpenTask => throw new NotImplementedException(); |
| |
| public string Address |
| { |
| get |
| { |
| if (IsDynamic) |
| { |
| WaitForOpenToComplete(); |
| return (protonSender.RemoteTerminus as ITarget)?.Address; |
| } |
| else |
| { |
| return protonSender.Target?.Address; |
| } |
| } |
| } |
| |
| public ISource Source |
| { |
| get |
| { |
| WaitForOpenToComplete(); |
| return remoteSource; |
| } |
| } |
| |
| public ITarget Target |
| { |
| get |
| { |
| WaitForOpenToComplete(); |
| return remoteTarget; |
| } |
| } |
| |
| public IReadOnlyDictionary<string, object> Properties |
| { |
| get |
| { |
| WaitForOpenToComplete(); |
| return ClientConversionSupport.ToStringKeyedMap(protonSender.RemoteProperties); |
| } |
| } |
| |
| public IReadOnlyCollection<string> OfferedCapabilities |
| { |
| get |
| { |
| WaitForOpenToComplete(); |
| return ClientConversionSupport.ToStringArray(protonSender.OfferedCapabilities); |
| } |
| } |
| |
| public IReadOnlyCollection<string> DesiredCapabilities |
| { |
| get |
| { |
| WaitForOpenToComplete(); |
| return ClientConversionSupport.ToStringArray(protonSender.DesiredCapabilities); |
| } |
| } |
| |
| public IStreamSenderMessage BeginMessage(IDictionary<string, object> deliveryAnnotations = null) |
| { |
| CheckClosedOrFailed(); |
| DeliveryAnnotations annotations = null; |
| |
| if (deliveryAnnotations != null) |
| { |
| annotations = new DeliveryAnnotations(ClientConversionSupport.ToSymbolKeyedMap(deliveryAnnotations)); |
| } |
| |
| throw new NotImplementedException(); |
| } |
| |
| public void Close(IErrorCondition error = null) |
| { |
| throw new NotImplementedException(); |
| } |
| |
| public Task<ISender> CloseAsync(IErrorCondition error = null) |
| { |
| throw new NotImplementedException(); |
| } |
| |
| public void Detach(IErrorCondition error = null) |
| { |
| throw new NotImplementedException(); |
| } |
| |
| public Task<ISender> DetachAsync(IErrorCondition error = null) |
| { |
| throw new NotImplementedException(); |
| } |
| |
| public void Dispose() |
| { |
| throw new NotImplementedException(); |
| } |
| |
| public ITracker Send<T>(IMessage<T> message, IDictionary<string, object> deliveryAnnotations = null) |
| { |
| throw new NotImplementedException(); |
| } |
| |
| public ITracker TrySend<T>(IMessage<T> message, IDictionary<string, object> deliveryAnnotations = null) |
| { |
| throw new NotImplementedException(); |
| } |
| |
| #region Internal Stream Sender API |
| |
| internal string SenderId => senderId; |
| |
| internal bool IsClosed => closed; |
| |
| internal bool IsDynamic => protonSender.Target?.Dynamic ?? false; |
| |
| internal Engine.ISender ProtonSender => protonSender; |
| |
| internal StreamSenderOptions Options => options; |
| |
| internal ClientStreamSender Open() |
| { |
| // TODO |
| |
| return this; |
| } |
| |
| internal void Disposition(IOutgoingDelivery delivery, Types.Transport.IDeliveryState state, bool settled) |
| { |
| CheckClosedOrFailed(); |
| // TODO |
| // executor.execute(() -> { |
| // delivery.disposition(state, settled); |
| // }); |
| } |
| |
| internal void Abort(IOutgoingDelivery delivery, ClientStreamTracker tracker) |
| { |
| throw new NotImplementedException(); |
| } |
| |
| internal IStreamTracker SendMessage<E>(ClientStreamSenderMessage context, IAdvancedMessage<E> message) |
| { |
| throw new NotImplementedException(); |
| } |
| |
| internal void Complete(IOutgoingDelivery delivery, ClientStreamTracker tracker) |
| { |
| throw new NotImplementedException(); |
| } |
| |
| #endregion |
| |
| #region Private Receiver Implementation |
| |
| private void CheckClosedOrFailed() |
| { |
| if (IsClosed) |
| { |
| throw new ClientIllegalStateException("The Stream Sender was explicitly closed", failureCause); |
| } |
| else if (failureCause != null) |
| { |
| throw failureCause; |
| } |
| } |
| |
| private void WaitForOpenToComplete() |
| { |
| if (!openFuture.Task.IsCompleted || openFuture.Task.IsFaulted) |
| { |
| try |
| { |
| openFuture.Task.Wait(); |
| } |
| catch (Exception e) |
| { |
| throw failureCause ?? ClientExceptionSupport.CreateNonFatalOrPassthrough(e); |
| } |
| } |
| } |
| |
| #endregion |
| } |
| } |