blob: 2643c771b5c844ec9205473cc14359db310aa765 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Apache.Qpid.Proton.Buffer;
using Apache.Qpid.Proton.Client.Exceptions;
using Apache.Qpid.Proton.Client.Concurrent;
using Apache.Qpid.Proton.Engine;
using Apache.Qpid.Proton.Engine.Exceptions;
using Apache.Qpid.Proton.Types.Messaging;
using Apache.Qpid.Proton.Types.Transport;
using Apache.Qpid.Proton.Utilities;
namespace Apache.Qpid.Proton.Client.Implementation
{
/// <summary>
/// The stream delivery type manages the underlying state of an incoming
/// streaming message delivery and provides the stream type used to read
/// and block for reads when not all requested message data has arrived.
/// The delivery will also manage settlement of a streaming delivery and
/// apply receiver configuration rules like auto settlement to the delivery
/// as incoming portions of the message arrive.
/// </summary>
public class ClientStreamDelivery : IStreamDelivery
{
private readonly ClientStreamReceiver receiver;
private readonly IIncomingDelivery protonDelivery;
private ClientStreamReceiverMessage message;
private RawDeliveryInputStream rawInputStream;
internal ClientStreamDelivery(ClientStreamReceiver receiver, IIncomingDelivery protonDelivery)
{
this.receiver = receiver;
this.protonDelivery = protonDelivery;
this.protonDelivery.LinkedResource = this;
// Capture inbound events and route to an active stream or message
protonDelivery.DeliveryReadHandler(HandleDeliveryRead)
.DeliveryAbortedHandler(HandleDeliveryAborted);
}
public IStreamReceiver Receiver => receiver;
public bool Aborted => protonDelivery.IsAborted;
public bool Completed => !protonDelivery.IsPartial;
public uint MessageFormat => protonDelivery.MessageFormat;
public bool Settled => protonDelivery.IsSettled;
public IDeliveryState State => protonDelivery.State?.ToClientDeliveryState();
public bool RemoteSettled => protonDelivery.IsRemotelySettled;
public IDeliveryState RemoteState => protonDelivery.RemoteState.ToClientDeliveryState();
public IReadOnlyDictionary<string, object> Annotations
{
get
{
if (rawInputStream != null && message == null)
{
throw new ClientIllegalStateException("Cannot access Delivery Annotations API after requesting an InputStream");
}
return ClientConversionSupport.ToStringKeyedMap(
((ClientStreamReceiverMessage)Message()).DeliveryAnnotations?.Value);
}
}
public Stream RawInputStream
{
get
{
if (message != null)
{
throw new ClientIllegalStateException("Cannot access Delivery InputStream API after requesting an Message");
}
if (rawInputStream == null)
{
rawInputStream = new RawDeliveryInputStream(this);
}
return rawInputStream;
}
}
public IStreamReceiverMessage Message()
{
if (rawInputStream != null && message == null)
{
throw new ClientIllegalStateException("Cannot access Delivery Message API after requesting an InputStream");
}
if (message == null)
{
message = new ClientStreamReceiverMessage(receiver, this, rawInputStream = new RawDeliveryInputStream(this));
}
return message;
}
public IStreamDelivery Accept()
{
receiver.Disposition(protonDelivery, Accepted.Instance, true);
return this;
}
public IStreamDelivery Disposition(IDeliveryState state, bool settled)
{
receiver.Disposition(protonDelivery, state?.AsProtonType(), true);
return this;
}
public IStreamDelivery Modified(bool deliveryFailed, bool undeliverableHere)
{
receiver.Disposition(protonDelivery, new Modified(deliveryFailed, undeliverableHere), true);
return this;
}
public IStreamDelivery Reject(string condition, string description)
{
receiver.Disposition(protonDelivery, new Rejected(new ErrorCondition(condition, description)), true);
return this;
}
public IStreamDelivery Release()
{
receiver.Disposition(protonDelivery, Released.Instance, true);
return this;
}
public IStreamDelivery Settle()
{
receiver.Disposition(protonDelivery, null, true);
return this;
}
#region Internal Stream Delivery API
internal IIncomingDelivery ProtonDelivery => protonDelivery;
internal void HandleReceiverClosed(ClientStreamReceiver receiver)
{
rawInputStream?.HandleReceiverClosed(receiver);
}
#endregion
#region private stream delivery implementation
private void HandleDeliveryRead(IIncomingDelivery delivery)
{
rawInputStream?.HandleDeliveryRead(delivery);
}
private void HandleDeliveryAborted(IIncomingDelivery delivery)
{
rawInputStream?.HandleDeliveryAborted(delivery);
}
#endregion
#region Raw incoming byte stream message
private class RawDeliveryInputStream : Stream
{
private const int INVALID_MARK = -1;
private const int DEFAULT_MARK_LIMIT = 1024;
private readonly AtomicBoolean closed = false;
private readonly ClientStreamDelivery delivery;
private readonly ClientStreamReceiver receiver;
private readonly ClientSession session;
private readonly ClientConnection connection;
private readonly Engine.IIncomingDelivery protonDelivery;
private readonly IProtonCompositeBuffer buffer = IProtonCompositeBuffer.Compose();
private TaskCompletionSource<int> readRequest;
private long markIndex = INVALID_MARK;
private int markLimit;
public RawDeliveryInputStream(ClientStreamDelivery delivery)
{
this.delivery = delivery;
this.receiver = delivery.receiver;
this.protonDelivery = delivery.protonDelivery;
this.session = (ClientSession)delivery.receiver.Session;
this.connection = (ClientConnection)delivery.receiver.Session.Connection;
}
public override bool CanRead => true;
public override bool CanSeek => true;
public override bool CanWrite => false;
public override long Length
{
get
{
CheckStreamStateIsValid();
if (buffer.IsReadable)
{
return buffer.ReadableBytes;
}
else
{
TaskCompletionSource<int> request = new TaskCompletionSource<int>();
try
{
connection.Execute(() =>
{
if (protonDelivery.Available > 0)
{
buffer.Append(protonDelivery.ReadAll());
}
request.TrySetResult((int)buffer.ReadableBytes);
});
return connection.Request(receiver, request).Task.Result;
}
catch (Exception e)
{
throw new IOException("Error getting available bytes from incoming delivery", e);
}
}
}
}
public override long Position
{
get => markIndex = buffer.ReadOffset;
set => Seek(value, SeekOrigin.Begin);
}
public override void Close()
{
markLimit = 0;
markIndex = INVALID_MARK;
if (closed.CompareAndSet(false, true))
{
try
{
TaskCompletionSource<bool> closeRequest = new TaskCompletionSource<bool>();
connection.Execute(() =>
{
AutoAcceptDeliveryIfNecessary();
// If the deliver wasn't fully read either because there are remaining
// bytes locally we need to discard those to aid in retention avoidance.
// and to potentially open the session window to allow for fully reading
// and discarding any inbound bytes that remain.
try
{
_ = protonDelivery.ReadAll();
}
catch (EngineFailedException)
{
// Ignore as engine is down and we cannot read any more
}
// Clear anything that wasn't yet read and then clear any pending read request as EOF
buffer.WriteOffset = buffer.Capacity;
buffer.ReadOffset = buffer.Capacity;
buffer.Compact();
if (readRequest != null)
{
readRequest.TrySetResult(-1);
readRequest = null;
}
closeRequest.TrySetResult(true);
});
connection.Request(receiver, closeRequest);
}
finally
{
base.Close();
}
}
}
public override void Flush()
{
// Nothing to do here for incoming raw message stream.
}
public override int ReadByte()
{
CheckStreamStateIsValid();
int result = -1;
while (true)
{
if (buffer.IsReadable)
{
result = buffer.ReadUnsignedByte() & 0xff;
TryReleaseReadBuffers();
break;
}
else if (RequestMoreData() < 0)
{
break;
}
}
return result;
}
public override int Read(byte[] target, int offset, int length)
{
CheckStreamStateIsValid();
Statics.CheckFromIndexSize(offset, length, (int)buffer.ReadableBytes);
int remaining = length;
int bytesRead = 0;
if (length <= 0)
{
return 0;
}
while (remaining > 0)
{
if (buffer.IsReadable)
{
if (buffer.ReadableBytes < remaining)
{
int readTarget = (int)buffer.ReadableBytes;
buffer.CopyInto(buffer.ReadOffset, target, offset + bytesRead, buffer.ReadableBytes);
buffer.ReadOffset = buffer.WriteOffset;
bytesRead += readTarget;
remaining -= readTarget;
}
else
{
buffer.CopyInto(buffer.ReadOffset, target, offset + bytesRead, remaining);
buffer.ReadOffset += remaining;
bytesRead += remaining;
remaining = 0;
}
TryReleaseReadBuffers();
}
else if (RequestMoreData() < 0)
{
return bytesRead > 0 ? bytesRead : -1;
}
}
return bytesRead;
}
public override long Seek(long offset, SeekOrigin origin)
{
switch(origin)
{
// TODO
case SeekOrigin.Begin:
case SeekOrigin.Current:
case SeekOrigin.End:
break;
}
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException("Cannot set length an a message delivery incoming bytes stream");
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException("Cannot write to an a message delivery incoming bytes stream");
}
#region Delivery event handlers
internal void HandleDeliveryRead(IIncomingDelivery delivery)
{
if (closed)
{
// Clear any pending data to expand session window if not yet complete
_ = delivery.ReadAll();
}
else
{
// An input stream is awaiting some more incoming bytes, check to see if
// the delivery had a non-empty transfer frame and provide them.
if (readRequest != null)
{
if (delivery.Available > 0)
{
buffer.Append(protonDelivery.ReadAll());
readRequest.TrySetResult((int)buffer.ReadableBytes);
readRequest = null;
}
else if (!delivery.IsPartial)
{
AutoAcceptDeliveryIfNecessary();
readRequest.TrySetResult(-1);
readRequest = null;
}
}
}
}
internal void HandleDeliveryAborted(IIncomingDelivery delivery)
{
readRequest?.TrySetException(new ClientDeliveryAbortedException("The remote sender has aborted this delivery"));
delivery.Settle();
}
internal void HandleReceiverClosed(ClientStreamReceiver receiver)
{
readRequest?.TrySetException(new ClientResourceRemotelyClosedException("The receiver link has been remotely closed."));
}
#endregion
#region Private APIs for internal Stream use
private void TryReleaseReadBuffers()
{
if ((buffer.ReadOffset - markIndex) > markLimit)
{
markIndex = INVALID_MARK;
buffer.Compact();
}
}
private int RequestMoreData()
{
TaskCompletionSource<int> request = new TaskCompletionSource<int>();
try
{
connection.Execute(() =>
{
if (protonDelivery.Receiver.IsLocallyClosedOrDetached)
{
request.TrySetException(new ClientException("Cannot read from delivery due to link having been closed"));
}
else if (protonDelivery.Available > 0)
{
buffer.Append(protonDelivery.ReadAll());
request.TrySetResult((int)buffer.ReadableBytes);
}
else if (protonDelivery.IsAborted)
{
request.TrySetException(new ClientDeliveryAbortedException("The remote sender has aborted this delivery"));
}
else if (!protonDelivery.IsPartial)
{
AutoAcceptDeliveryIfNecessary();
request.TrySetResult(-1);
}
else
{
readRequest = request;
}
});
return connection.Request(receiver, request).Task.Result;
}
catch (Exception e)
{
throw new IOException("Error reading requested data", e);
}
}
private void AutoAcceptDeliveryIfNecessary()
{
if (receiver.ReceiverOptions.AutoAccept && !protonDelivery.IsSettled)
{
if (!buffer.IsReadable && protonDelivery.Available == 0 &&
(protonDelivery.IsAborted || !protonDelivery.IsPartial))
{
try
{
receiver.Disposition(protonDelivery, Accepted.Instance, receiver.ReceiverOptions.AutoSettle);
}
catch (Exception)
{
// TODO : LOG.trace("Caught error while attempting to auto accept the fully read delivery.", error);
}
}
}
}
private void CheckStreamStateIsValid()
{
if (closed)
{
throw new IOException("The InputStream has been explicitly closed");
}
if (receiver.IsClosed)
{
throw new IOException("Underlying receiver has closed", receiver.FailureCause);
}
}
#endregion
}
#endregion
}
}