blob: cac623d951840a83289fd8fa2aac15535f833f97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.IO;
using Apache.Qpid.Proton.Buffer;
using Apache.Qpid.Proton.Client.Concurrent;
using Apache.Qpid.Proton.Client.Exceptions;
using Apache.Qpid.Proton.Codec;
using Apache.Qpid.Proton.Types;
using Apache.Qpid.Proton.Types.Messaging;
namespace Apache.Qpid.Proton.Client.Implementation
{
public sealed class ClientStreamSenderMessage : IStreamSenderMessage
{
private static readonly int DATA_SECTION_HEADER_ENCODING_SIZE = 8;
// Standard encoding data for a Data Section (Requires four byte size written before writing the actual data)
private static readonly byte[] DATA_SECTION_PREAMBLE = { (byte)EncodingCodes.DescribedTypeIndicator,
(byte)EncodingCodes.SmallULong,
(byte)Data.DescriptorCode,
(byte)EncodingCodes.VBin32 };
private readonly ClientStreamSender sender;
private readonly DeliveryAnnotations deliveryAnnotations;
private readonly uint writeBufferSize;
private readonly ClientStreamTracker tracker;
private Header header;
private MessageAnnotations annotations;
private Properties properties;
private ApplicationProperties applicationProperties;
private Footer footer;
private IProtonBuffer buffer;
private volatile uint messageFormat;
private StreamState currentState = StreamState.PREAMBLE;
internal ClientStreamSenderMessage(ClientStreamSender sender, ClientStreamTracker tracker, DeliveryAnnotations deliveryAnnotations)
{
this.sender = sender;
this.deliveryAnnotations = deliveryAnnotations;
this.tracker = tracker;
if (sender.Options.WriteBufferSize > 0)
{
writeBufferSize = Math.Max(StreamSenderOptions.MIN_BUFFER_SIZE_LIMIT, sender.Options.WriteBufferSize);
}
else
{
writeBufferSize = Math.Max(StreamSenderOptions.MIN_BUFFER_SIZE_LIMIT,
(uint)sender.ProtonSender.Connection.MaxFrameSize);
}
}
internal Engine.IOutgoingDelivery ProtonDelivery => tracker.ProtonDelivery;
public IStreamTracker Tracker => Completed ? tracker : null;
public IStreamSender Sender => sender;
public uint MessageFormat
{
get => messageFormat;
set
{
if (currentState != StreamState.PREAMBLE)
{
throw new ClientIllegalStateException("Cannot set message format after body writes have started.");
}
this.messageFormat = value;
}
}
public bool Completed => currentState == StreamState.COMPLETE;
public bool Aborted => currentState == StreamState.ABORTED;
public IStreamSenderMessage Abort()
{
if (Completed)
{
throw new ClientIllegalStateException("Cannot abort an already completed send context");
}
if (!Aborted)
{
currentState = StreamState.ABORTED;
sender.Abort(ProtonDelivery, tracker);
}
return this;
}
public IStreamSenderMessage Complete()
{
if (Aborted)
{
throw new ClientIllegalStateException("Cannot complete an already aborted send context");
}
if (!Completed)
{
// This may result in completion if the write surpasses the buffer limit but we still
// need to check in case it does not, or if there are no footers...
if (footer != null)
{
Write(footer);
}
currentState = StreamState.COMPLETE;
// If there is buffered data we can flush and complete in one Transfer
// frame otherwise we only need to do work if there was ever a send on
// this context which would imply we have a Tracker and a Delivery.
if (buffer != null && buffer.IsReadable)
{
DoFlush();
}
else
{
sender.Complete(ProtonDelivery, tracker);
}
}
return this;
}
public IProtonBuffer Encode(IDictionary<string, object> deliveryAnnotations)
{
throw new ClientUnsupportedOperationException("StreamSenderMessage cannot be directly encoded");
}
public Stream RawOutputStream()
{
if (Completed)
{
throw new ClientIllegalStateException("Cannot create an Stream from a completed send context");
}
if (Aborted)
{
throw new ClientIllegalStateException("Cannot create an Stream from a aborted send context");
}
if (currentState == StreamState.BODY_WRITTING)
{
throw new ClientIllegalStateException("Cannot add more body sections while an Stream is active");
}
TransitionToWritableState();
return new SendContextRawBytesOutputStream(
this, ProtonByteBufferAllocator.Instance.Allocate(writeBufferSize, writeBufferSize));
}
#region AMQP Header access APIs
public Header Header
{
get => header;
set
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Header after body writing has started.");
this.header = value;
}
}
public bool Durable
{
get => header?.Durable ?? Header.DEFAULT_DURABILITY;
set => LazyCreateHeader().Durable = value;
}
public byte Priority
{
get => header?.Priority ?? Header.DEFAULT_PRIORITY;
set => LazyCreateHeader().Priority = value;
}
public uint TimeToLive
{
get => header?.TimeToLive ?? Header.DEFAULT_TIME_TO_LIVE;
set => LazyCreateHeader().TimeToLive = value;
}
public bool FirstAcquirer
{
get => header?.FirstAcquirer ?? Header.DEFAULT_FIRST_ACQUIRER;
set => LazyCreateHeader().FirstAcquirer = value;
}
public uint DeliveryCount
{
get => header?.DeliveryCount ?? Header.DEFAULT_DELIVERY_COUNT;
set => LazyCreateHeader().DeliveryCount = value;
}
#endregion
#region AMQP Properties access APIs
public Properties Properties
{
get => properties;
set
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Properties after body writing has started.");
this.properties = value;
}
}
public object MessageId
{
get => properties?.MessageId;
set => LazyCreateProperties().MessageId = value;
}
public byte[] UserId
{
get
{
byte[] result = null;
if (properties?.UserId?.ReadableBytes > 0)
{
result = new byte[properties.UserId.ReadableBytes];
properties.UserId.CopyInto(properties.UserId.ReadOffset, result, 0, result.LongLength);
}
return result;
}
set
{
LazyCreateProperties().UserId = ProtonByteBufferAllocator.Instance.Wrap(value);
}
}
public string To
{
get => properties?.To;
set => LazyCreateProperties().To = value;
}
public string Subject
{
get => properties?.Subject;
set => LazyCreateProperties().Subject = value;
}
public string ReplyTo
{
get => properties?.ReplyTo;
set => LazyCreateProperties().ReplyTo = value;
}
public object CorrelationId
{
get => properties?.CorrelationId;
set => LazyCreateProperties().CorrelationId = value;
}
public string ContentType
{
get => properties?.ContentType;
set => LazyCreateProperties().ContentType = value;
}
public string ContentEncoding
{
get => properties?.ContentEncoding;
set => LazyCreateProperties().ContentEncoding = value;
}
public ulong AbsoluteExpiryTime
{
get => properties?.AbsoluteExpiryTime ?? 0;
set => LazyCreateProperties().AbsoluteExpiryTime = value;
}
public ulong CreationTime
{
get => properties?.CreationTime ?? 0;
set => LazyCreateProperties().CreationTime = value;
}
public string GroupId
{
get => properties?.GroupId;
set => LazyCreateProperties().GroupId = value;
}
public uint GroupSequence
{
get => properties?.GroupSequence ?? 0;
set => LazyCreateProperties().GroupSequence = value;
}
public string ReplyToGroupId
{
get => properties?.ReplyToGroupId;
set => LazyCreateProperties().ReplyToGroupId = value;
}
#endregion
#region AMQP Message Annotations access APIs
public MessageAnnotations Annotations
{
get => annotations;
set
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Annotations after body writing has started.");
this.annotations = value;
}
}
public bool HasAnnotations => annotations?.Value?.Count > 0;
public IMessage<Stream> ForEachAnnotation(Action<string, object> consumer)
{
if (HasAnnotations)
{
foreach (KeyValuePair<Symbol, object> entry in annotations.Value)
{
consumer.Invoke(entry.Key.ToString(), entry.Value);
}
}
return this;
}
public object GetAnnotation(string key)
{
object result = null;
Annotations?.Value?.TryGetValue(Symbol.Lookup(key), out result);
return result;
}
public IMessage<Stream> SetAnnotation(string key, object value)
{
LazyCreateMessageAnnotations().Value[Symbol.Lookup(key)] = value;
return this;
}
public bool HasAnnotation(string key)
{
return annotations?.Value?.ContainsKey(Symbol.Lookup(key)) ?? false;
}
public object RemoveAnnotation(string key)
{
object oldValue = null;
if (HasAnnotations)
{
annotations.Value.TryGetValue(Symbol.Lookup(key), out oldValue);
annotations.Value.Remove(Symbol.Lookup(key));
}
return oldValue;
}
#endregion
#region AMQP Application Properties access APIs
public ApplicationProperties ApplicationProperties
{
get => applicationProperties;
set
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Application Properties after body writing has started.");
this.applicationProperties = value;
}
}
public bool HasProperties => applicationProperties?.Value?.Count > 0;
public IMessage<Stream> ForEachProperty(Action<string, object> consumer)
{
if (HasProperties)
{
foreach (KeyValuePair<string, object> entry in applicationProperties.Value)
{
consumer.Invoke(entry.Key, entry.Value);
}
}
return this;
}
public object GetProperty(string key)
{
object result = null;
applicationProperties?.Value?.TryGetValue(key, out result);
return result;
}
public IMessage<Stream> SetProperty(string key, object value)
{
LazyCreateApplicationProperties().Value[key] = value;
return this;
}
public bool HasProperty(string key)
{
return applicationProperties?.Value?.ContainsKey(key) ?? false;
}
public object RemoveProperty(string key)
{
object oldValue = null;
if (HasProperties)
{
applicationProperties.Value.TryGetValue(key, out oldValue);
applicationProperties.Value.Remove(key);
}
return oldValue;
}
#endregion
#region AMQP Footer access APIs
public Footer Footer
{
get => footer;
set
{
if (currentState >= StreamState.COMPLETE)
{
throw new ClientIllegalStateException(
"Cannot write to Message Footer after message has been marked completed or aborted.");
}
this.footer = value;
}
}
public bool HasFooters => footer?.Value?.Count > 0;
public IMessage<Stream> ForEachFooter(Action<string, object> consumer)
{
if (HasFooters)
{
foreach (KeyValuePair<Symbol, object> entry in footer.Value)
{
consumer.Invoke(entry.Key.ToString(), entry.Value);
}
}
return this;
}
public object GetFooter(string key)
{
object result = null;
footer?.Value?.TryGetValue(Symbol.Lookup(key), out result);
return result;
}
public IMessage<Stream> SetFooter(string key, object value)
{
LazyCreateFooter().Value[Symbol.Lookup(key)] = value;
return this;
}
public bool HasFooter(string key)
{
return footer?.Value?.ContainsKey(Symbol.Lookup(key)) ?? false;
}
public object RemoveFooter(string key)
{
object oldValue = null;
if (HasFooters)
{
footer.Value.TryGetValue(Symbol.Lookup(key), out oldValue);
footer.Value.Remove(Symbol.Lookup(key));
}
return oldValue;
}
#endregion
#region AMQP Body access APIs
public Stream Body
{
get => GetBodyStream(new OutputStreamOptions());
set => throw new ClientUnsupportedOperationException("Cannot set an Stream body on a StreamSenderMessage");
}
public IAdvancedMessage<Stream> ForEachBodySection(Action<ISection> consumer)
{
return this;
}
public IAdvancedMessage<Stream> AddBodySection(ISection section)
{
if (Completed)
{
throw new ClientIllegalStateException("Cannot add more body sections to a completed message");
}
if (Aborted)
{
throw new ClientIllegalStateException("Cannot add more body sections to an aborted message");
}
if (currentState == StreamState.BODY_WRITTING)
{
throw new ClientIllegalStateException("Cannot add more body sections while an OutputStream is active");
}
TransitionToWritableState();
AppendDataToBuffer(ClientMessageSupport.EncodeSection(section, ProtonByteBufferAllocator.Instance.Allocate()).Split());
return this;
}
public IAdvancedMessage<Stream> ClearBodySections()
{
return this;
}
public IEnumerable<ISection> GetBodySections()
{
return new ISection[0]; // Non null empty result to indicate no sections
}
public IAdvancedMessage<Stream> SetBodySections(IEnumerable<ISection> sections)
{
if (sections == null)
{
throw new ArgumentNullException("Cannot set body sections with a null enumeration");
}
foreach (ISection section in sections)
{
AddBodySection(section);
}
return this;
}
public Stream GetBodyStream(OutputStreamOptions options)
{
if (Completed)
{
throw new ClientIllegalStateException("Cannot create an OutputStream from a completed send context");
}
if (Aborted)
{
throw new ClientIllegalStateException("Cannot create an OutputStream from a aborted send context");
}
if (currentState == StreamState.BODY_WRITTING)
{
throw new ClientIllegalStateException("Cannot add more body sections while an OutputStream is active");
}
TransitionToWritableState();
IProtonBuffer streamBuffer = ProtonByteBufferAllocator.Instance.Allocate(writeBufferSize, writeBufferSize);
if (options.BodyLength > 0)
{
return new SingularDataSectionOutputStream(this, options, streamBuffer);
}
else
{
return new MultipleDataSectionsOutputStream(this, options, streamBuffer);
}
}
#endregion
#region Private stream sender message stream state management
private enum StreamState
{
PREAMBLE,
BODY_WRITABLE,
BODY_WRITTING,
COMPLETE,
ABORTED
}
private void CheckStreamState(StreamState state, string errorMessage)
{
if (currentState != state)
{
throw new ClientIllegalStateException(errorMessage);
}
}
private Header LazyCreateHeader()
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Header after body writing has started.");
return header ?? (header = new Header());
}
private Properties LazyCreateProperties()
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Properties after body writing has started.");
return properties ?? (properties = new Properties());
}
private ApplicationProperties LazyCreateApplicationProperties()
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Application Properties after body writing has started.");
return applicationProperties ?? (applicationProperties = new ApplicationProperties(new Dictionary<string, object>()));
}
private MessageAnnotations LazyCreateMessageAnnotations()
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Annotations after body writing has started.");
return annotations ?? (annotations = new MessageAnnotations(new Dictionary<Symbol, object>()));
}
private Footer LazyCreateFooter()
{
if (currentState >= StreamState.COMPLETE)
{
throw new ClientIllegalStateException(
"Cannot write to Message Footer after message has been marked completed or aborted.");
}
return footer ?? (footer = new Footer(new Dictionary<Symbol, object>()));
}
private void AppendDataToBuffer(IProtonBuffer incoming)
{
if (buffer == null)
{
buffer = incoming;
}
else
{
// When appending buffers we should ensure that the buffer instance is only
// readable to ensure that the composite will accept a chain of buffers which
// must not have any unwritten gaps.
if (buffer is ProtonCompositeBuffer)
{
((ProtonCompositeBuffer)buffer).Append(incoming);
}
else
{
buffer = IProtonCompositeBuffer.Compose(buffer, incoming);
}
}
// Were aren't currently attempting to optimize each outbound chunk of the streaming
// send, if the block accumulated is larger than the write buffer we don't try and
// split it but instead let the frame writer just write multiple frames. This can
// result in a trailing single tiny frame but for now this case isn't being optimized
if (buffer.ReadableBytes >= writeBufferSize)
{
try
{
sender.DoStreamMessage(this, buffer, messageFormat);
}
finally
{
buffer = null;
}
}
}
private void DoFlush()
{
if (buffer != null && buffer.IsReadable)
{
try
{
sender.DoStreamMessage(this, buffer, messageFormat);
}
finally
{
buffer = null;
}
}
}
private ClientStreamSenderMessage Write(ISection section)
{
if (Aborted)
{
throw new ClientIllegalStateException("Cannot write a Section to an already aborted send context");
}
if (Completed)
{
throw new ClientIllegalStateException("Cannot write a Section to an already completed send context");
}
AppendDataToBuffer(ClientMessageSupport.EncodeSection(section, ProtonByteBufferAllocator.Instance.Allocate()).Split());
return this;
}
private void TransitionToWritableState()
{
if (currentState == StreamState.PREAMBLE)
{
if (header != null)
{
AppendDataToBuffer(
ClientMessageSupport.EncodeSection(header, ProtonByteBufferAllocator.Instance.Allocate()).Split());
}
if (deliveryAnnotations != null)
{
AppendDataToBuffer(
ClientMessageSupport.EncodeSection(deliveryAnnotations, ProtonByteBufferAllocator.Instance.Allocate()).Split());
}
if (annotations != null)
{
AppendDataToBuffer(
ClientMessageSupport.EncodeSection(annotations, ProtonByteBufferAllocator.Instance.Allocate()).Split());
}
if (properties != null)
{
AppendDataToBuffer(
ClientMessageSupport.EncodeSection(properties, ProtonByteBufferAllocator.Instance.Allocate()).Split());
}
if (applicationProperties != null)
{
AppendDataToBuffer(
ClientMessageSupport.EncodeSection(applicationProperties, ProtonByteBufferAllocator.Instance.Allocate()).Split());
}
currentState = StreamState.BODY_WRITABLE;
}
}
#endregion
#region Stream implementations used for writing body sections
private abstract class StreamMessageOutputStream : Stream
{
protected readonly AtomicBoolean closed = new AtomicBoolean();
protected readonly OutputStreamOptions options;
protected IProtonBuffer streamBuffer;
protected readonly ClientStreamSenderMessage message;
protected int bytesWritten;
public StreamMessageOutputStream(ClientStreamSenderMessage message, OutputStreamOptions options, IProtonBuffer buffer)
{
this.options = options;
this.streamBuffer = buffer;
this.message = message;
// Stream takes control of state until closed.
this.message.currentState = StreamState.BODY_WRITTING;
}
#region Stream API implementation
// The output of the data section on a per frame basis prevents
// the stream from being seekable and of course cannot be read from.
// Length is never really know because it could be that we've written
// one data section and moved onto another, position cannot be changed
// because we are writing in batches and cannot go back to a batch that
// was written and forgotten.
public sealed override bool CanTimeout => false;
public sealed override bool CanRead => false;
public sealed override bool CanWrite => true;
public sealed override bool CanSeek => false;
public sealed override long Length => throw new NotImplementedException("No length value available");
public sealed override long Position
{
get => throw new NotImplementedException("Cannot read a position from a streamed message stream");
set => throw new NotImplementedException("Cannot assign a position to a streamed message stream");
}
#endregion
public override void WriteByte(byte value)
{
CheckClosed();
CheckOutputLimitReached(1);
streamBuffer.WriteUnsignedByte(value);
if (!streamBuffer.IsWritable)
{
Flush();
}
bytesWritten++;
}
public override void Write(ReadOnlySpan<byte> bytes)
{
Write(bytes.ToArray(), 0, bytes.Length);
}
public override void Write(byte[] bytes, int offset, int length)
{
CheckClosed();
CheckOutputLimitReached(length);
if (streamBuffer.WritableBytes >= length)
{
streamBuffer.WriteBytes(bytes, offset, length);
bytesWritten += length;
if (!streamBuffer.IsWritable)
{
Flush();
}
}
else
{
int remaining = length;
while (remaining > 0)
{
int toWrite = (int)Math.Min(remaining, streamBuffer.WritableBytes);
bytesWritten += toWrite;
streamBuffer.WriteBytes(bytes, offset + (length - remaining), toWrite);
if (!streamBuffer.IsWritable)
{
Flush();
}
remaining -= toWrite;
}
}
}
public override void Flush()
{
CheckClosed();
if (options.BodyLength <= 0)
{
DoFlushPending(false);
}
else
{
DoFlushPending(bytesWritten == options.BodyLength && options.CompleteSendOnClose);
}
}
public override void Close()
{
if (closed.CompareAndSet(false, true) && !message.Completed)
{
message.currentState = StreamState.BODY_WRITABLE;
if (options.BodyLength > 0 && options.BodyLength != bytesWritten)
{
// Limit was set but user did not write all of it so we must abort.
try
{
message.Abort();
}
catch (ClientException e)
{
throw new IOException(e.Message, e);
}
}
else
{
// Limit not set or was set and user wrote that many bytes so we can complete.
DoFlushPending(options.CompleteSendOnClose);
}
}
}
private void CheckOutputLimitReached(int writeSize)
{
int outputLimit = options.BodyLength;
if (message.Completed)
{
throw new IOException("Cannot write to an already completed message output stream");
}
if (outputLimit > 0 && (bytesWritten + writeSize) > outputLimit)
{
throw new IOException("Cannot write beyond configured stream output limit");
}
}
private void CheckClosed()
{
if (closed.Get())
{
throw new IOException("The OutputStream has already been closed.");
}
if (message.sender.IsClosed)
{
throw new IOException("The parent Sender instance has already been closed.");
}
}
protected virtual void DoFlushPending(bool complete)
{
try
{
if (streamBuffer.IsReadable)
{
// Copy the buffer as it will be reset and reused and we cannot
// assume that the buffer will be fully written by the IO layer
// before the next write operation is allowed to proceed.
message.AppendDataToBuffer(streamBuffer.Copy());
}
if (complete)
{
message.Complete();
}
else
{
message.DoFlush();
}
if (!complete)
{
streamBuffer.Reset();
}
}
catch (ClientException e)
{
throw new IOException(e.Message, e);
}
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotImplementedException("Cannot read from a streamed message output stream");
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException("Cannot seek within a streamed message output stream");
}
public override void SetLength(long value)
{
throw new NotImplementedException("Cannot change length of a streamed message output stream");
}
}
private sealed class SendContextRawBytesOutputStream : StreamMessageOutputStream
{
public SendContextRawBytesOutputStream(ClientStreamSenderMessage message, IProtonBuffer buffer)
: base(message, new OutputStreamOptions(), buffer)
{
}
}
private sealed class SingularDataSectionOutputStream : StreamMessageOutputStream
{
public SingularDataSectionOutputStream(ClientStreamSenderMessage message, OutputStreamOptions options, IProtonBuffer buffer)
: base(message, options, buffer)
{
IProtonBuffer preamble = ProtonByteBufferAllocator.Instance.Allocate(
DATA_SECTION_HEADER_ENCODING_SIZE, DATA_SECTION_HEADER_ENCODING_SIZE);
preamble.WriteBytes(DATA_SECTION_PREAMBLE);
preamble.WriteInt(options.BodyLength);
message.AppendDataToBuffer(preamble);
}
}
private sealed class MultipleDataSectionsOutputStream : StreamMessageOutputStream
{
public MultipleDataSectionsOutputStream(ClientStreamSenderMessage message, OutputStreamOptions options, IProtonBuffer buffer)
: base(message, options, buffer)
{
}
protected override void DoFlushPending(bool complete)
{
if (streamBuffer.IsReadable)
{
IProtonBuffer preamble = ProtonByteBufferAllocator.Instance.Allocate(
DATA_SECTION_HEADER_ENCODING_SIZE, DATA_SECTION_HEADER_ENCODING_SIZE);
preamble.WriteBytes(DATA_SECTION_PREAMBLE);
preamble.WriteInt((int)streamBuffer.ReadableBytes);
try
{
message.AppendDataToBuffer(preamble);
}
catch (ClientException e)
{
throw new IOException(e.Message, e);
}
}
base.DoFlushPending(complete);
}
}
#endregion
}
}