blob: 05db7e253f5ad9a6ae97bbd6feaa5d15941d0eef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.IO;
using Apache.Qpid.Proton.Buffer;
using Apache.Qpid.Proton.Client.Exceptions;
using Apache.Qpid.Proton.Types;
using Apache.Qpid.Proton.Types.Messaging;
namespace Apache.Qpid.Proton.Client.Implementation
{
public sealed class ClientStreamSenderMessage : IStreamSenderMessage
{
private readonly ClientStreamSender sender;
private readonly DeliveryAnnotations deliveryAnnotations;
private readonly uint writeBufferSize;
private readonly IAdvancedMessage<object> streamMessagePacket; // TODO Temporary variable
// TODO private readonly StreamMessagePacket streamMessagePacket = new StreamMessagePacket();
private readonly ClientStreamTracker tracker;
private Header header;
private MessageAnnotations annotations;
private Properties properties;
private ApplicationProperties applicationProperties;
private Footer footer;
private IProtonBuffer buffer;
private volatile uint messageFormat;
private StreamState currentState = StreamState.PREAMBLE;
internal ClientStreamSenderMessage(ClientStreamSender sender, ClientStreamTracker tracker, DeliveryAnnotations deliveryAnnotations)
{
this.sender = sender;
this.deliveryAnnotations = deliveryAnnotations;
this.tracker = tracker;
if (sender.Options.WriteBufferSize > 0)
{
writeBufferSize = Math.Max(StreamSenderOptions.MIN_BUFFER_SIZE_LIMIT, sender.Options.WriteBufferSize);
}
else
{
writeBufferSize = Math.Max(StreamSenderOptions.MIN_BUFFER_SIZE_LIMIT,
(uint)sender.ProtonSender.Connection.MaxFrameSize);
}
}
internal Engine.IOutgoingDelivery ProtonDelivery => tracker.ProtonDelivery;
public IStreamTracker Tracker => tracker;
public IStreamSender Sender => sender;
public uint MessageFormat
{
get => messageFormat;
set
{
if (currentState != StreamState.PREAMBLE)
{
throw new ClientIllegalStateException("Cannot set message format after body writes have started.");
}
this.messageFormat = value;
}
}
public bool Completed => currentState == StreamState.COMPLETE;
public bool Aborted => currentState == StreamState.ABORTED;
public IStreamSenderMessage Abort()
{
if (Completed)
{
throw new ClientIllegalStateException("Cannot abort an already completed send context");
}
if (!Aborted)
{
currentState = StreamState.ABORTED;
sender.Abort(ProtonDelivery, tracker);
}
return this;
}
public IStreamSenderMessage Complete()
{
if (Aborted)
{
throw new ClientIllegalStateException("Cannot complete an already aborted send context");
}
if (!Completed)
{
// This may result in completion if the write surpasses the buffer limit but we still
// need to check in case it does not, or if there are no footers...
if (footer != null)
{
Write(footer);
}
currentState = StreamState.COMPLETE;
// If there is buffered data we can flush and complete in one Transfer
// frame otherwise we only need to do work if there was ever a send on
// this context which would imply we have a Tracker and a Delivery.
if (buffer != null && buffer.IsReadable)
{
DoFlush();
}
else
{
sender.Complete(ProtonDelivery, tracker);
}
}
return this;
}
public IProtonBuffer Encode(IDictionary<string, object> deliveryAnnotations)
{
throw new ClientUnsupportedOperationException("StreamSenderMessage cannot be directly encoded");
}
public Stream RawOutputStream()
{
throw new NotImplementedException();
}
#region AMQP Header access APIs
public Header Header
{
get => header;
set
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Header after body writing has started.");
this.header = value;
}
}
public bool Durable
{
get => header?.Durable ?? Header.DEFAULT_DURABILITY;
set => LazyCreateHeader().Durable = value;
}
public byte Priority
{
get => header?.Priority ?? Header.DEFAULT_PRIORITY;
set => LazyCreateHeader().Priority = value;
}
public uint TimeToLive
{
get => header?.TimeToLive ?? Header.DEFAULT_TIME_TO_LIVE;
set => LazyCreateHeader().TimeToLive = value;
}
public bool FirstAcquirer
{
get => header?.FirstAcquirer ?? Header.DEFAULT_FIRST_ACQUIRER;
set => LazyCreateHeader().FirstAcquirer = value;
}
public uint DeliveryCount
{
get => header?.DeliveryCount ?? Header.DEFAULT_DELIVERY_COUNT;
set => LazyCreateHeader().DeliveryCount = value;
}
#endregion
#region AMQP Properties access APIs
public Properties Properties
{
get => properties;
set
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Properties after body writing has started.");
this.properties = value;
}
}
public object MessageId
{
get => properties?.MessageId;
set => LazyCreateProperties().MessageId = value;
}
public byte[] UserId
{
get
{
byte[] result = null;
if (properties?.UserId?.ReadableBytes > 0)
{
result = new byte[properties.UserId.ReadableBytes];
properties.UserId.CopyInto(properties.UserId.ReadOffset, result, 0, result.LongLength);
}
return result;
}
set
{
LazyCreateProperties().UserId = ProtonByteBufferAllocator.Instance.Wrap(value);
}
}
public string To
{
get => properties?.To;
set => LazyCreateProperties().To = value;
}
public string Subject
{
get => properties?.Subject;
set => LazyCreateProperties().Subject = value;
}
public string ReplyTo
{
get => properties?.ReplyTo;
set => LazyCreateProperties().ReplyTo = value;
}
public object CorrelationId
{
get => properties?.CorrelationId;
set => LazyCreateProperties().CorrelationId = value;
}
public string ContentType
{
get => properties?.ContentType;
set => LazyCreateProperties().ContentType = value;
}
public string ContentEncoding
{
get => properties?.ContentEncoding;
set => LazyCreateProperties().ContentEncoding = value;
}
public ulong AbsoluteExpiryTime
{
get => properties?.AbsoluteExpiryTime ?? 0;
set => LazyCreateProperties().AbsoluteExpiryTime = value;
}
public ulong CreationTime
{
get => properties?.CreationTime ?? 0;
set => LazyCreateProperties().CreationTime = value;
}
public string GroupId
{
get => properties?.GroupId;
set => LazyCreateProperties().GroupId = value;
}
public uint GroupSequence
{
get => properties?.GroupSequence ?? 0;
set => LazyCreateProperties().GroupSequence = value;
}
public string ReplyToGroupId
{
get => properties?.ReplyToGroupId;
set => LazyCreateProperties().ReplyToGroupId = value;
}
#endregion
#region AMQP Message Annotations access APIs
public MessageAnnotations Annotations
{
get => annotations;
set
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Annotations after body writing has started.");
this.annotations = value;
}
}
public bool HasAnnotations => annotations?.Value?.Count > 0;
public IMessage<Stream> ForEachAnnotation(Action<string, object> consumer)
{
if (HasAnnotations)
{
foreach (KeyValuePair<Symbol, object> entry in annotations.Value)
{
consumer.Invoke(entry.Key.ToString(), entry.Value);
}
}
return this;
}
public object GetAnnotation(string key)
{
object result = null;
Annotations?.Value?.TryGetValue(Symbol.Lookup(key), out result);
return result;
}
public IMessage<Stream> SetAnnotation(string key, object value)
{
LazyCreateMessageAnnotations().Value[Symbol.Lookup(key)] = value;
return this;
}
public bool HasAnnotation(string key)
{
return annotations?.Value?.ContainsKey(Symbol.Lookup(key)) ?? false;
}
public object RemoveAnnotation(string key)
{
object oldValue = null;
if (HasAnnotations)
{
annotations.Value.TryGetValue(Symbol.Lookup(key), out oldValue);
annotations.Value.Remove(Symbol.Lookup(key));
}
return oldValue;
}
#endregion
#region AMQP Application Properties access APIs
public ApplicationProperties ApplicationProperties
{
get => applicationProperties;
set
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Application Properties after body writing has started.");
this.applicationProperties = value;
}
}
public bool HasProperties => applicationProperties?.Value?.Count > 0;
public IMessage<Stream> ForEachProperty(Action<string, object> consumer)
{
if (HasProperties)
{
foreach (KeyValuePair<string, object> entry in applicationProperties.Value)
{
consumer.Invoke(entry.Key, entry.Value);
}
}
return this;
}
public object GetProperty(string key)
{
object result = null;
applicationProperties?.Value?.TryGetValue(key, out result);
return result;
}
public IMessage<Stream> SetProperty(string key, object value)
{
LazyCreateApplicationProperties().Value[key] = value;
return this;
}
public bool HasProperty(string key)
{
return applicationProperties?.Value?.ContainsKey(key) ?? false;
}
public object RemoveProperty(string key)
{
object oldValue = null;
if (HasProperties)
{
applicationProperties.Value.TryGetValue(key, out oldValue);
applicationProperties.Value.Remove(key);
}
return oldValue;
}
#endregion
#region AMQP Footer access APIs
public Footer Footer
{
get => footer;
set
{
if (currentState >= StreamState.COMPLETE)
{
throw new ClientIllegalStateException(
"Cannot write to Message Footer after message has been marked completed or aborted.");
}
this.footer = value;
}
}
public bool HasFooters => footer?.Value?.Count > 0;
public IMessage<Stream> ForEachFooter(Action<string, object> consumer)
{
if (HasFooters)
{
foreach (KeyValuePair<Symbol, object> entry in footer.Value)
{
consumer.Invoke(entry.Key.ToString(), entry.Value);
}
}
return this;
}
public object GetFooter(string key)
{
object result = null;
footer?.Value?.TryGetValue(Symbol.Lookup(key), out result);
return result;
}
public IMessage<Stream> SetFooter(string key, object value)
{
LazyCreateFooter().Value[Symbol.Lookup(key)] = value;
return this;
}
public bool HasFooter(string key)
{
return footer?.Value?.ContainsKey(Symbol.Lookup(key)) ?? false;
}
public object RemoveFooter(string key)
{
object oldValue = null;
if (HasFooters)
{
footer.Value.TryGetValue(Symbol.Lookup(key), out oldValue);
footer.Value.Remove(Symbol.Lookup(key));
}
return oldValue;
}
#endregion
#region AMQP Body access APIs
public Stream Body
{
get => GetBodyStream(new OutputStreamOptions());
set => throw new ClientUnsupportedOperationException("Cannot set an OutputStream body on a StreamSenderMessage");
}
public IAdvancedMessage<Stream> ForEachBodySection(Action<ISection> consumer)
{
return this;
}
public IAdvancedMessage<Stream> AddBodySection(ISection section)
{
throw new NotImplementedException();
}
public IAdvancedMessage<Stream> ClearBodySections()
{
return this;
}
public IEnumerable<ISection> GetBodySections()
{
return new ISection[0];
}
public IAdvancedMessage<Stream> SetBodySections(IEnumerable<ISection> sections)
{
if (sections == null)
{
throw new ArgumentNullException("Cannot set body sections with a null enumeration");
}
foreach (ISection section in sections)
{
AddBodySection(section);
}
return this;
}
public Stream GetBodyStream(OutputStreamOptions options)
{
throw new NotImplementedException();
}
#endregion
#region Private stream sender message stream state management
private enum StreamState
{
PREAMBLE,
BODY_WRITABLE,
BODY_WRITTING,
COMPLETE,
ABORTED
}
private void CheckStreamState(StreamState state, string errorMessage)
{
if (currentState != state)
{
throw new ClientIllegalStateException(errorMessage);
}
}
private Header LazyCreateHeader()
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Header after body writing has started.");
return header ?? (header = new Header());
}
private Properties LazyCreateProperties()
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Properties after body writing has started.");
return properties ?? (properties = new Properties());
}
private ApplicationProperties LazyCreateApplicationProperties()
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Application Properties after body writing has started.");
return applicationProperties ?? (applicationProperties = new ApplicationProperties());
}
private MessageAnnotations LazyCreateMessageAnnotations()
{
CheckStreamState(StreamState.PREAMBLE, "Cannot write to Message Annotations after body writing has started.");
return annotations ?? (annotations = new MessageAnnotations());
}
private Footer LazyCreateFooter()
{
if (currentState >= StreamState.COMPLETE)
{
throw new ClientIllegalStateException(
"Cannot write to Message Footer after message has been marked completed or aborted.");
}
return footer ?? (footer = new Footer());
}
private void AppendDataToBuffer(IProtonBuffer incoming)
{
throw new NotImplementedException();
}
private void DoFlush()
{
if (buffer != null && buffer.IsReadable)
{
try
{
sender.SendMessage(this, streamMessagePacket);
}
finally
{
buffer = null;
}
}
}
private ClientStreamSenderMessage Write(ISection section)
{
if (Aborted)
{
throw new ClientIllegalStateException("Cannot write a Section to an already aborted send context");
}
if (Completed)
{
throw new ClientIllegalStateException("Cannot write a Section to an already completed send context");
}
AppendDataToBuffer(ClientMessageSupport.EncodeSection(section, ProtonByteBufferAllocator.Instance.Allocate()));
return this;
}
#endregion
}
}