blob: cf082d01993c577d3a59f699ac6e7cb89d1168de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.Qpid.Proton.Client.Exceptions;
using Apache.Qpid.Proton.Client.Concurrent;
using Apache.Qpid.Proton.Engine;
using Apache.Qpid.Proton.Types.Messaging;
using Apache.Qpid.Proton.Logging;
namespace Apache.Qpid.Proton.Client.Implementation
{
/// <summary>
/// Implements the stream sender using a stateful current outgoing message that prevents
/// any sends other than to the current message until the current is completed.
/// </summary>
public sealed class ClientStreamSender : IStreamSender
{
private static IProtonLogger LOG = ProtonLoggerFactory.GetLogger<ClientStreamSender>();
private readonly AtomicBoolean closed = new AtomicBoolean();
private ClientException failureCause;
private readonly StreamSenderOptions options;
private readonly ClientSession session;
private readonly string senderId;
private readonly bool sendsSettled;
private readonly TaskCompletionSource<IStreamSender> openFuture = new TaskCompletionSource<IStreamSender>();
private readonly TaskCompletionSource<IStreamSender> closeFuture = new TaskCompletionSource<IStreamSender>();
private Engine.ISender protonSender;
private Action<ISender> senderRemotelyClosedHandler;
private volatile ISource remoteSource;
private volatile ITarget remoteTarget;
internal ClientStreamSender(ClientSession session, StreamSenderOptions options, string senderId, Engine.ISender protonSender)
{
this.options = new StreamSenderOptions(options);
this.session = session;
this.senderId = senderId;
this.protonSender = protonSender;
this.protonSender.LinkedResource = this;
this.sendsSettled = protonSender.SenderSettleMode == Types.Transport.SenderSettleMode.Settled;
}
public IClient Client => session.Client;
public IConnection Connection => session.Connection;
public ISession Session => session;
public Task<ISender> OpenTask => throw new NotImplementedException();
public string Address
{
get
{
if (IsDynamic)
{
WaitForOpenToComplete();
return (protonSender.RemoteTerminus as ITarget)?.Address;
}
else
{
return protonSender.Target?.Address;
}
}
}
public ISource Source
{
get
{
WaitForOpenToComplete();
return remoteSource;
}
}
public ITarget Target
{
get
{
WaitForOpenToComplete();
return remoteTarget;
}
}
public IReadOnlyDictionary<string, object> Properties
{
get
{
WaitForOpenToComplete();
return ClientConversionSupport.ToStringKeyedMap(protonSender.RemoteProperties);
}
}
public IReadOnlyCollection<string> OfferedCapabilities
{
get
{
WaitForOpenToComplete();
return ClientConversionSupport.ToStringArray(protonSender.OfferedCapabilities);
}
}
public IReadOnlyCollection<string> DesiredCapabilities
{
get
{
WaitForOpenToComplete();
return ClientConversionSupport.ToStringArray(protonSender.DesiredCapabilities);
}
}
public IStreamSenderMessage BeginMessage(IDictionary<string, object> deliveryAnnotations = null)
{
CheckClosedOrFailed();
DeliveryAnnotations annotations = null;
if (deliveryAnnotations != null)
{
annotations = new DeliveryAnnotations(ClientConversionSupport.ToSymbolKeyedMap(deliveryAnnotations));
}
throw new NotImplementedException();
}
public void Close(IErrorCondition error = null)
{
throw new NotImplementedException();
}
public Task<ISender> CloseAsync(IErrorCondition error = null)
{
throw new NotImplementedException();
}
public void Detach(IErrorCondition error = null)
{
throw new NotImplementedException();
}
public Task<ISender> DetachAsync(IErrorCondition error = null)
{
throw new NotImplementedException();
}
public void Dispose()
{
throw new NotImplementedException();
}
public ITracker Send<T>(IMessage<T> message, IDictionary<string, object> deliveryAnnotations = null)
{
throw new NotImplementedException();
}
public ITracker TrySend<T>(IMessage<T> message, IDictionary<string, object> deliveryAnnotations = null)
{
throw new NotImplementedException();
}
#region Internal Stream Sender API
internal string SenderId => senderId;
internal bool IsClosed => closed;
internal bool IsDynamic => protonSender.Target?.Dynamic ?? false;
internal Engine.ISender ProtonSender => protonSender;
internal StreamSenderOptions Options => options;
internal ClientStreamSender Open()
{
// TODO
return this;
}
internal void Disposition(IOutgoingDelivery delivery, Types.Transport.IDeliveryState state, bool settled)
{
CheckClosedOrFailed();
// TODO
// executor.execute(() -> {
// delivery.disposition(state, settled);
// });
}
internal void Abort(IOutgoingDelivery delivery, ClientStreamTracker tracker)
{
throw new NotImplementedException();
}
internal IStreamTracker SendMessage<E>(ClientStreamSenderMessage context, IAdvancedMessage<E> message)
{
throw new NotImplementedException();
}
internal void Complete(IOutgoingDelivery delivery, ClientStreamTracker tracker)
{
throw new NotImplementedException();
}
#endregion
#region Private Receiver Implementation
private void CheckClosedOrFailed()
{
if (IsClosed)
{
throw new ClientIllegalStateException("The Stream Sender was explicitly closed", failureCause);
}
else if (failureCause != null)
{
throw failureCause;
}
}
private void WaitForOpenToComplete()
{
if (!openFuture.Task.IsCompleted || openFuture.Task.IsFaulted)
{
try
{
openFuture.Task.Wait();
}
catch (Exception e)
{
throw failureCause ?? ClientExceptionSupport.CreateNonFatalOrPassthrough(e);
}
}
}
#endregion
}
}