blob: b832043b9690b39e98518b0c45625676d407025f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.Qpid.Proton.Client.Exceptions;
using Apache.Qpid.Proton.Client.Concurrent;
using Apache.Qpid.Proton.Client.Utilities;
using Apache.Qpid.Proton.Logging;
namespace Apache.Qpid.Proton.Client.Implementation
{
/// <summary>
/// Client session that wraps a proton session instance and provides the higher level
/// clint API for managing links and creating session scoped transaction instances.
/// </summary>
public class ClientSession : ISession
{
private static IProtonLogger LOG = ProtonLoggerFactory.GetLogger<ClientSession>();
private static readonly IClientTransactionContext NoOpTransactionContext = new ClientNoOpTransactionContext();
private const long INFINITE = -1;
private readonly SessionOptions options;
private readonly ClientConnection connection;
private readonly string sessionId;
private readonly AtomicBoolean closed = new AtomicBoolean();
private readonly ClientSenderBuilder senderBuilder;
private readonly ClientReceiverBuilder receiverBuilder;
private readonly TaskCompletionSource<ISession> openFuture = new TaskCompletionSource<ISession>();
private readonly TaskCompletionSource<ISession> closeFuture = new TaskCompletionSource<ISession>();
private IClientTransactionContext txnContext = NoOpTransactionContext;
private Engine.ISession protonSession;
private ClientException failureCause;
public ClientSession(ClientConnection connection, SessionOptions options, string sessionId, Engine.ISession session)
{
this.options = new SessionOptions(options);
this.connection = connection;
this.protonSession = session;
this.sessionId = sessionId;
this.senderBuilder = new ClientSenderBuilder(this);
this.receiverBuilder = new ClientReceiverBuilder(this);
}
public IClient Client => connection.Client;
public IConnection Connection => connection;
public Task<ISession> OpenTask => openFuture.Task;
public IReadOnlyDictionary<string, object> Properties
{
get
{
WaitForOpenToComplete();
return ClientConversionSupport.ToStringKeyedMap(protonSession.RemoteProperties);
}
}
public IReadOnlyCollection<string> OfferedCapabilities
{
get
{
WaitForOpenToComplete();
return ClientConversionSupport.ToStringArray(protonSession.RemoteOfferedCapabilities);
}
}
public IReadOnlyCollection<string> DesiredCapabilities
{
get
{
WaitForOpenToComplete();
return ClientConversionSupport.ToStringArray(protonSession.RemoteDesiredCapabilities);
}
}
public void Dispose()
{
try
{
Close();
}
catch (Exception)
{
}
}
public void Close(IErrorCondition error = null)
{
try
{
CloseAsync(error).Wait();
}
catch (Exception)
{
}
}
public Task<ISession> CloseAsync(IErrorCondition error = null)
{
return DoClose(error);
}
public virtual IReceiver OpenDurableReceiver(string address, string subscriptionName, ReceiverOptions options = null)
{
CheckClosedOrFailed();
Objects.RequireNonNull(address, "Cannot create a durable receiver with a null address");
Objects.RequireNonNull(address, "Cannot create a durable receiver with a null subscription name");
TaskCompletionSource<IReceiver> createReceiver = new TaskCompletionSource<IReceiver>();
connection.Execute(() =>
{
try
{
CheckClosedOrFailed();
_ = createReceiver.TrySetResult(InternalOpenDurableReceiver(address, subscriptionName, options));
}
catch (Exception error)
{
_ = createReceiver.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return connection.Request(this, createReceiver).Task.GetAwaiter().GetResult();
}
public virtual IReceiver OpenDynamicReceiver(ReceiverOptions options = null, IDictionary<string, object> dynamicNodeProperties = null)
{
CheckClosedOrFailed();
TaskCompletionSource<IReceiver> createReceiver = new TaskCompletionSource<IReceiver>();
connection.Execute(() =>
{
try
{
CheckClosedOrFailed();
_ = createReceiver.TrySetResult(InternalOpenDynamicReceiver(dynamicNodeProperties, options));
}
catch (Exception error)
{
_ = createReceiver.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return connection.Request(this, createReceiver).Task.GetAwaiter().GetResult();
}
public virtual IReceiver OpenReceiver(string address, ReceiverOptions options = null)
{
CheckClosedOrFailed();
Objects.RequireNonNull(address, "Cannot create a receiver with a null address");
TaskCompletionSource<IReceiver> createReceiver = new TaskCompletionSource<IReceiver>();
connection.Execute(() =>
{
try
{
CheckClosedOrFailed();
_ = createReceiver.TrySetResult(InternalOpenReceiver(address, options));
}
catch (Exception error)
{
_ = createReceiver.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return connection.Request(this, createReceiver).Task.GetAwaiter().GetResult();
}
public virtual ISender OpenAnonymousSender(SenderOptions options = null)
{
CheckClosedOrFailed();
TaskCompletionSource<ISender> createSender = new TaskCompletionSource<ISender>();
connection.Execute(() =>
{
try
{
CheckClosedOrFailed();
_ = createSender.TrySetResult(InternalOpenAnonymousSender(options));
}
catch (Exception error)
{
_ = createSender.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return connection.Request(this, createSender).Task.GetAwaiter().GetResult();
}
public virtual ISender OpenSender(string address, SenderOptions options = null)
{
CheckClosedOrFailed();
Objects.RequireNonNull(address, "Cannot create a sender with a null address");
TaskCompletionSource<ISender> createSender = new TaskCompletionSource<ISender>();
connection.Execute(() =>
{
try
{
CheckClosedOrFailed();
_ = createSender.TrySetResult(InternalOpenSender(address, options));
}
catch (Exception error)
{
_ = createSender.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return connection.Request(this, createSender).Task.GetAwaiter().GetResult();
}
public ISession BeginTransaction()
{
CheckClosedOrFailed();
TaskCompletionSource<ISession> beginFuture = new TaskCompletionSource<ISession>();
connection.Execute(() =>
{
try
{
CheckClosedOrFailed();
if (txnContext == NoOpTransactionContext)
{
txnContext = new ClientLocalTransactionContext(this);
}
txnContext.Begin(beginFuture);
}
catch (Exception error)
{
_ = beginFuture.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return connection.Request(this, beginFuture).Task.GetAwaiter().GetResult();
}
public ISession CommitTransaction()
{
CheckClosedOrFailed();
TaskCompletionSource<ISession> commitFuture = new TaskCompletionSource<ISession>();
connection.Execute(() =>
{
try
{
CheckClosedOrFailed();
txnContext.Commit(commitFuture, false);
}
catch (Exception error)
{
_ = commitFuture.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return connection.Request(this, commitFuture).Task.GetAwaiter().GetResult();
}
public ISession RollbackTransaction()
{
CheckClosedOrFailed();
TaskCompletionSource<ISession> rollbackFuture = new TaskCompletionSource<ISession>();
connection.Execute(() =>
{
try
{
CheckClosedOrFailed();
txnContext.Rollback(rollbackFuture, false);
}
catch (Exception error)
{
_ = rollbackFuture.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(error));
}
});
return connection.Request(this, rollbackFuture).Task.GetAwaiter().GetResult();
}
#region Internal client session API
internal void CheckClosedOrFailed()
{
if (IsClosed)
{
throw new ClientIllegalStateException("The Session was explicitly closed", failureCause);
}
else if (failureCause != null)
{
throw failureCause;
}
}
internal string SessionId => sessionId;
internal Engine.ISession ProtonSession => protonSession;
internal bool IsClosed => closed;
internal SessionOptions Options => options;
internal IClientTransactionContext TransactionContext => txnContext;
internal void Execute(Action action) => connection.Execute(action);
internal void Schedule(Action action, TimeSpan delay) => connection.Schedule(action, delay);
internal ClientSession Open()
{
protonSession.LocalOpenHandler(HandleLocalOpen)
.LocalCloseHandler(HandleLocalClose)
.OpenHandler(HandleRemoteOpen)
.CloseHandler(HandleRemoteClose)
.EngineShutdownHandler(HandleEngineShutdown);
try
{
protonSession.Open();
}
catch (Exception)
{
// Connection is responding to all engine failed errors
}
return this;
}
internal void ScheduleRequestTimeout<T>(TaskCompletionSource<T> request, long timeout, Func<ClientException> errorSupplier)
{
if (timeout != INFINITE)
{
connection.Schedule(() =>
_ = request.TrySetException(errorSupplier.Invoke()), TimeSpan.FromMilliseconds(timeout));
}
else
{
// TODO return null;
}
}
internal TaskCompletionSource<T> Request<T>(Object requestor, TaskCompletionSource<T> request)
{
return connection.Request(requestor, request);
}
internal ClientReceiver InternalOpenReceiver(string address, ReceiverOptions receiverOptions)
{
return receiverBuilder.Receiver(address, receiverOptions).Open();
}
internal ClientStreamReceiver InternalOpenStreamReceiver(string address, StreamReceiverOptions receiverOptions)
{
return receiverBuilder.StreamReceiver(address, receiverOptions).Open();
}
internal ClientReceiver InternalOpenDurableReceiver(string address, string subscriptionName, ReceiverOptions receiverOptions)
{
return receiverBuilder.DurableReceiver(address, subscriptionName, receiverOptions).Open();
}
internal ClientReceiver InternalOpenDynamicReceiver(IDictionary<string, object> dynamicNodeProperties, ReceiverOptions receiverOptions)
{
return receiverBuilder.DynamicReceiver(dynamicNodeProperties, receiverOptions).Open();
}
internal ClientSender InternalOpenSender(string address, SenderOptions senderOptions)
{
return senderBuilder.Sender(address, senderOptions).Open();
}
internal ClientSender InternalOpenAnonymousSender(SenderOptions senderOptions = null)
{
// When the connection is opened we are ok to check that the anonymous relay is supported
// and open the sender if so, otherwise we need to wait.
if (connection.HasOpened)
{
connection.CheckAnonymousRelaySupported();
return senderBuilder.AnonymousSender(senderOptions).Open();
}
else
{
return senderBuilder.AnonymousSender(senderOptions);
}
}
internal ClientStreamSender InternalOpenStreamSender(string address, StreamSenderOptions senderOptions)
{
return senderBuilder.StreamSender(address, senderOptions).Open();
}
#endregion
#region Session private API
private Task<ISession> DoClose(IErrorCondition error)
{
if (closed.CompareAndSet(false, true))
{
// Already closed by failure or shutdown so no need to
if (!closeFuture.Task.IsCompleted)
{
connection.Execute(() =>
{
if (protonSession.IsLocallyOpen)
{
try
{
protonSession.ErrorCondition = ClientErrorCondition.AsProtonErrorCondition(error);
protonSession.Close();
}
catch (Exception)
{
// Allow engine error handler to deal with this
}
}
});
}
}
return closeFuture.Task;
}
private Engine.ISession ConfigureSession(Engine.ISession protonSession)
{
protonSession.LinkedResource = this;
protonSession.OfferedCapabilities = ClientConversionSupport.ToSymbolArray(options.OfferedCapabilities);
protonSession.DesiredCapabilities = ClientConversionSupport.ToSymbolArray(options.DesiredCapabilities);
protonSession.Properties = ClientConversionSupport.ToSymbolKeyedMap(options.Properties);
return protonSession;
}
private void WaitForOpenToComplete()
{
if (!openFuture.Task.IsCompleted || openFuture.Task.IsFaulted)
{
try
{
openFuture.Task.Wait();
}
catch (Exception e)
{
throw failureCause ?? ClientExceptionSupport.CreateNonFatalOrPassthrough(e);
}
}
}
private void ImmediateSessionShutdown(ClientException failureCause)
{
if (this.failureCause == null)
{
this.failureCause = failureCause;
}
try
{
protonSession.Close();
}
catch (Exception)
{
}
if (failureCause != null)
{
_ = openFuture.TrySetException(failureCause);
}
else
{
_ = openFuture.TrySetResult(this);
}
_ = closeFuture.TrySetResult(this);
}
#endregion
#region Session Proton event handling API
private void HandleLocalOpen(Engine.ISession session)
{
if (options.OpenTimeout > 0)
{
connection.Schedule(() =>
{
if (!openFuture.Task.IsCompleted)
{
ImmediateSessionShutdown(
new ClientOperationTimedOutException("Session open timed out waiting for remote to respond"));
}
}, TimeSpan.FromMilliseconds(options.OpenTimeout));
}
}
private void HandleLocalClose(Engine.ISession session)
{
// If not yet remotely closed we only wait for a remote close if the engine isn't
// already failed and we have successfully opened the session without a timeout.
if (session.IsRemotelyOpen && failureCause == null && !session.Engine.IsShutdown)
{
long timeout = options.CloseTimeout;
if (timeout > 0)
{
ScheduleRequestTimeout(closeFuture, timeout, () =>
new ClientOperationTimedOutException("Session close timed out waiting for remote to respond"));
}
}
else
{
ImmediateSessionShutdown(failureCause);
}
}
private void HandleRemoteOpen(Engine.ISession session)
{
openFuture.TrySetResult(this);
LOG.Trace("Session:{0} opened successfully.", SessionId);
foreach (Engine.ISender sender in protonSession.Senders)
{
if (!sender.IsLocallyOpen)
{
// TODO Client Stream sender not accounted for yet
ClientSender clientSender = (ClientSender)sender.LinkedResource;
if (connection.IsAnonymousRelaySupported)
{
clientSender.Open();
}
else
{
clientSender.HandleUpdateAnonymousRelayNotSupported();
}
}
}
}
private void HandleRemoteClose(Engine.ISession session)
{
if (session.IsLocallyOpen)
{
ImmediateSessionShutdown(ClientExceptionSupport.ConvertToSessionClosedException(session.RemoteErrorCondition));
}
else
{
ImmediateSessionShutdown(failureCause);
}
}
private void HandleEngineShutdown(Engine.IEngine engine)
{
// If the connection has an engine that is running then it is going to attempt
// reconnection and we want to recover by creating a new Session that will be
// opened once the remote has been recovered.
if (!connection.ProtonEngine.IsShutdown)
{
// No local close processing needed but we should try and let the session
// clean up any resources it can by closing it.
protonSession.LocalCloseHandler(null);
protonSession.Close();
protonSession = ConfigureSession(ClientSessionBuilder.RecreateSession(connection, protonSession, options));
Open();
}
else
{
Engine.IConnection connection = engine.Connection;
ClientException failureCause;
if (connection.RemoteErrorCondition != null)
{
failureCause = ClientExceptionSupport.ConvertToConnectionClosedException(connection.RemoteErrorCondition);
}
else if (engine.FailureCause != null)
{
failureCause = ClientExceptionSupport.ConvertToConnectionClosedException(engine.FailureCause);
}
else if (!IsClosed)
{
failureCause = new ClientConnectionRemotelyClosedException("Remote closed without a specific error condition");
}
else
{
failureCause = null;
}
ImmediateSessionShutdown(failureCause);
}
}
#endregion
}
}