blob: 53e709224f7ff53345a1c9f586eb8334f916e6dd [file] [log] [blame]
#--
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#++
module Qpid::Proton
# A Message represents an addressable quantity of data.
#
# ==== Message Body
#
# The message body can be set using the #body= method. The message will
# then attempt to determine how exactly to encode the content.
#
# ==== Examples
#
# To create a message for sending:
#
# # send a simple text message
# msg = Qpid::Proton::Message.new
# msg.body = "STATE: update"
#
# # send a binary chunk of data
# data = File.binread("/home/qpid/binfile.tar.gz")
# msg = Qpid::Proton::Message.new
# msg.body = Qpid::Proton::BinaryString.new(data)
#
class Message
# @private
def proton_send(sender, tag = nil)
dlv = sender.delivery(tag || sender.delivery_tag)
encoded = self.encode
sender.stream(encoded)
sender.advance
dlv.settle if sender.snd_settle_mode == Link::SND_SETTLED
return dlv
end
# Decodes a message from supplied AMQP data and returns the number
# of bytes consumed.
#
# ==== Options
#
# * encoded - the encoded data
#
def decode(encoded)
check(Cproton.pn_message_decode(@impl, encoded, encoded.length))
post_decode
end
def post_decode # :nodoc:
# decode elements from the message
@properties = {}
props = Codec::Data.new(Cproton::pn_message_properties(@impl))
if props.next
@properties = props.type.get(props)
end
@instructions = nil
insts = Codec::Data.new(Cproton::pn_message_instructions(@impl))
if insts.next
@instructions = insts.type.get(insts)
end
@annotations = nil
annts = Codec::Data.new(Cproton::pn_message_annotations(@impl))
if annts.next
@annotations = annts.type.get(annts)
end
@body = nil
body = Codec::Data.new(Cproton::pn_message_body(@impl))
if body.next
@body = body.type.get(body)
end
end
# Encodes the message.
def encode
pre_encode
size = 16
loop do
error, data = Cproton::pn_message_encode(@impl, size)
if error == Qpid::Proton::Error::OVERFLOW
size *= 2
else
check(error)
return data
end
end
end
def pre_encode # :nodoc:
# encode elements from the message
props = Codec::Data.new(Cproton::pn_message_properties(@impl))
props.clear
Codec::Mapping.for_class(@properties.class).put(props, @properties) unless @properties.empty?
insts = Codec::Data.new(Cproton::pn_message_instructions(@impl))
insts.clear
if !@instructions.nil?
mapping = Codec::Mapping.for_class(@instructions.class)
mapping.put(insts, @instructions)
end
annts = Codec::Data.new(Cproton::pn_message_annotations(@impl))
annts.clear
if !@annotations.nil?
mapping = Codec::Mapping.for_class(@annotations.class)
mapping.put(annts, @annotations, :keys => :SYMBOL)
end
body = Codec::Data.new(Cproton::pn_message_body(@impl))
body.clear
if !@body.nil?
mapping = Codec::Mapping.for_class(@body.class)
mapping.put(body, @body)
end
end
# Creates a new +Message+ instance.
def initialize
@impl = Cproton.pn_message
ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
@properties = {}
@instructions = {}
@annotations = {}
@body = nil
end
def to_s
tmp = Cproton.pn_string("")
Cproton.pn_inspect(@impl, tmp)
result = Cproton.pn_string_get(tmp)
Cproton.pn_free(tmp)
return result
end
# Invoked by garbage collection to clean up resources used
# by the underlying message implementation.
def self.finalize!(impl) # :nodoc:
proc {
Cproton.pn_message_free(impl)
}
end
# Returns the underlying message implementation.
def impl # :nodoc:
@impl
end
# Clears the state of the +Message+. This allows a single instance of
# +Message+ to be reused.
#
def clear
Cproton.pn_message_clear(@impl)
@properties.clear unless @properties.nil?
@instructions.clear unless @instructions.nil?
@annotations.clear unless @annotations.nil?
@body = nil
end
# Returns the most recent error number.
#
def errno
Cproton.pn_message_errno(@impl)
end
# Returns the most recent error message.
#
def error
Cproton.pn_error_text(Cproton.pn_message_error(@impl))
end
# Returns whether there is currently an error reported.
#
def error?
!Cproton.pn_message_errno(@impl).zero?
end
# Sets the durable flag.
#
# See ::durable for more details on message durability.
#
# ==== Options
#
# * state - the durable state
#
def durable=(state)
raise TypeError.new("state cannot be nil") if state.nil?
Cproton.pn_message_set_durable(@impl, state)
end
# Returns the durable property.
#
# The durable property indicates that the emessage should be held durably
# by any intermediaries taking responsibility for the message.
#
# ==== Examples
#
# msg = Qpid::Proton::Message.new
# msg.durable = true
#
def durable
Cproton.pn_message_is_durable(@impl)
end
# Sets the priority.
#
# +NOTE:+ Priority values are limited to the range [0,255].
#
# ==== Options
#
# * priority - the priority value
#
def priority=(priority)
raise TypeError.new("invalid priority: #{priority}") if priority.nil? || !([Float, Fixnum].include?(priority.class))
raise RangeError.new("priority out of range: #{priority}") if ((priority > 255) || (priority < 0))
Cproton.pn_message_set_priority(@impl, priority.floor)
end
# Returns the priority.
#
def priority
Cproton.pn_message_get_priority(@impl)
end
# Sets the time-to-live for the message.
#
# ==== Options
#
# * time - the time in milliseconds
#
def ttl=(time)
raise TypeError.new("invalid ttl: #{time}") if time.nil? || !([Float, Fixnum].include?(time.class))
raise RangeError.new("time out of range: #{time}") if ((time < 0))
Cproton.pn_message_set_ttl(@impl, time.floor)
end
# Returns the time-to-live, in milliseconds.
#
def ttl
Cproton.pn_message_get_ttl(@impl)
end
# Sets whether this is the first time the message was acquired.
#
# See ::first_acquirer? for more details.
#
# ==== Options
#
# * state - true if claiming the message
#
def first_acquirer=(state)
raise TypeError.new("invalid state: #{state}") if state.nil? || !([TrueClass, FalseClass].include?(state.class))
Cproton.pn_message_set_first_acquirer(@impl, state)
end
# Sets the delivery count for the message.
#
# See ::delivery_count for more details.
#
# ==== Options
#
# * count - the delivery count
#
def delivery_count=(count)
raise ::ArgumentError.new("invalid count: #{count}") if count.nil? || !([Float, Fixnum].include?(count.class))
raise RangeError.new("count out of range: #{count}") if count < 0
Cproton.pn_message_set_delivery_count(@impl, count.floor)
end
# Returns the delivery count for the message.
#
# This is the number of delivery attempts for the given message.
#
def delivery_count
Cproton.pn_message_get_delivery_count(@impl)
end
# Returns whether this is the first acquirer.
#
#
def first_acquirer?
Cproton.pn_message_is_first_acquirer(@impl)
end
# Sets the message id.
#
# ==== Options
#
# * id = the id
#
def id=(id)
Cproton.pn_message_set_id(@impl, id)
end
# Returns the message id.
#
def id
Cproton.pn_message_get_id(@impl)
end
# Sets the user id.
#
# ==== Options
#
# * id - the user id
#
def user_id=(id)
Cproton.pn_message_set_user_id(@impl, id)
end
# Returns the user id.
#
def user_id
Cproton.pn_message_get_user_id(@impl)
end
# Sets the destination address.
#
# ==== Options
#
# * address - the address
#
def address=(address)
Cproton.pn_message_set_address(@impl, address)
end
# Returns the destination address.
#
def address
Cproton.pn_message_get_address(@impl)
end
# Sets the subject.
#
# ==== Options
#
# * subject - the subject
#
def subject=(subject)
Cproton.pn_message_set_subject(@impl, subject)
end
# Returns the subject
#
def subject
Cproton.pn_message_get_subject(@impl)
end
# Sets the reply-to address.
#
# ==== Options
#
# * address - the reply-to address
#
def reply_to=(address)
Cproton.pn_message_set_reply_to(@impl, address)
end
# Returns the reply-to address
#
def reply_to
Cproton.pn_message_get_reply_to(@impl)
end
# Sets the correlation id.
#
# ==== Options
#
# * id - the correlation id
#
def correlation_id=(id)
Cproton.pn_message_set_correlation_id(@impl, id)
end
# Returns the correlation id.
#
def correlation_id
Cproton.pn_message_get_correlation_id(@impl)
end
# Sets the content type.
#
# ==== Options
#
# * content_type - the content type
#
def content_type=(content_type)
Cproton.pn_message_set_content_type(@impl, content_type)
end
# Returns the content type
#
def content_type
Cproton.pn_message_get_content_type(@impl)
end
# Sets the message content.
#
# *WARNING:* This method has been deprecated. Please use #body= instead to
# set the content of a message.
#
# ==== Options
#
# * content - the content
#
def content=(content)
Cproton.pn_message_load(@impl, content)
end
# Returns the message content.
#
# *WARNING:* This method has been deprecated. Please use #body instead to
# retrieve the content of a message.
#
def content
size = 16
loop do
result = Cproton.pn_message_save(@impl, size)
error = result[0]
data = result[1]
if error == Qpid::Proton::Error::OVERFLOW
size = size * 2
else
check(error)
return data
end
end
end
# Sets the content encoding type.
#
# ==== Options
#
# * encoding - the content encoding
#
def content_encoding=(encoding)
Cproton.pn_message_set_content_encoding(@impl, encoding)
end
# Returns the content encoding type.
#
def content_encoding
Cproton.pn_message_get_content_encoding(@impl)
end
# Sets the expiration time.
#
# ==== Options
#
# * time - the expiry time
#
def expires=(time)
raise TypeError.new("invalid expiry time: #{time}") if time.nil?
raise ::ArgumentError.new("expiry time cannot be negative: #{time}") if time < 0
Cproton.pn_message_set_expiry_time(@impl, time)
end
# Returns the expiration time.
#
def expires
Cproton.pn_message_get_expiry_time(@impl)
end
# Sets the creation time.
#
# ==== Options
#
# * time - the creation time
#
def creation_time=(time)
raise TypeError.new("invalid time: #{time}") if time.nil?
raise ::ArgumentError.new("time cannot be negative") if time < 0
Cproton.pn_message_set_creation_time(@impl, time)
end
# Returns the creation time.
#
def creation_time
Cproton.pn_message_get_creation_time(@impl)
end
# Sets the group id.
#
# ==== Options
#
# * id - the group id
#
def group_id=(id)
Cproton.pn_message_set_group_id(@impl, id)
end
# Returns the group id.
#
def group_id
Cproton.pn_message_get_group_id(@impl)
end
# Sets the group sequence number.
#
# ==== Options
#
# * seq - the sequence number
#
def group_sequence=(seq)
raise TypeError.new("invalid seq: #{seq}") if seq.nil?
Cproton.pn_message_set_group_sequence(@impl, seq)
end
# Returns the group sequence number.
#
def group_sequence
Cproton.pn_message_get_group_sequence(@impl)
end
# Sets the reply-to group id.
#
# ==== Options
#
# * id - the id
#
def reply_to_group_id=(id)
Cproton.pn_message_set_reply_to_group_id(@impl, id)
end
# Returns the reply-to group id.
#
def reply_to_group_id
Cproton.pn_message_get_reply_to_group_id(@impl)
end
# Returns the list of property names for associated with this message.
#
# ==== Examples
#
# msg.properties.each do |name|
# end
#
def properties
@properties
end
# Replaces the entire set of properties with the specified hash.
#
def properties=(properties)
@properties = properties
end
# Assigns the value given to the named property.
#
# ==== Arguments
#
# * name - the property name
# * value - the property value
#
def []=(name, value)
@properties[name] = value
end
# Retrieves the value for the specified property name. If not found, then
# it returns nil.
#
def [](name)
@properties[name]
end
# Deletes the named property.
#
def delete_property(name)
@properties.delete(name)
end
# Returns the instructions for this message.
#
def instructions
@instructions
end
# Assigns instructions to this message.
#
def instructions=(instr)
@instructions = instr
end
# Returns the annotations for this message.
#
def annotations
@annotations
end
# Assigns annotations to this message.
#
def annotations=(annotations)
@annotations = annotations
end
# Returns the body property of the message.
#
def body
@body
end
# Assigns a new value to the body of the message.
#
def body=(body)
@body = body
end
private
def check(err) # :nodoc:
if err < 0
raise DataError, "[#{err}]: #{Cproton.pn_message_error(@data)}"
else
return err
end
end
end
end