| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| */ |
| |
| package amqp |
| |
| // #include <proton/codec.h> |
| // #include <proton/types.h> |
| // #include <proton/message.h> |
| // #include <stdlib.h> |
| // |
| // /* Helper for setting message string fields */ |
| // typedef int (*set_fn)(pn_message_t*, const char*); |
| // int msg_set_str(pn_message_t* m, char* s, set_fn set) { |
| // int result = set(m, s); |
| // free(s); |
| // return result; |
| // } |
| // |
| import "C" |
| |
| import ( |
| "bytes" |
| "fmt" |
| "reflect" |
| "time" |
| ) |
| |
| // Message is the interface to an AMQP message. |
| type Message interface { |
| // Durable indicates that any parties taking responsibility |
| // for the message must durably store the content. |
| Durable() bool |
| SetDurable(bool) |
| |
| // Priority impacts ordering guarantees. Within a |
| // given ordered context, higher priority messages may jump ahead of |
| // lower priority messages. |
| Priority() uint8 |
| SetPriority(uint8) |
| |
| // TTL or Time To Live, a message it may be dropped after this duration |
| TTL() time.Duration |
| SetTTL(time.Duration) |
| |
| // FirstAcquirer indicates |
| // that the recipient of the message is the first recipient to acquire |
| // the message, i.e. there have been no failed delivery attempts to |
| // other acquirers. Note that this does not mean the message has not |
| // been delivered to, but not acquired, by other recipients. |
| FirstAcquirer() bool |
| SetFirstAcquirer(bool) |
| |
| // DeliveryCount tracks how many attempts have been made to |
| // delivery a message. |
| DeliveryCount() uint32 |
| SetDeliveryCount(uint32) |
| |
| // MessageId provides a unique identifier for a message. |
| // it can be an a string, an unsigned long, a uuid or a |
| // binary value. |
| MessageId() interface{} |
| SetMessageId(interface{}) |
| |
| UserId() string |
| SetUserId(string) |
| |
| Address() string |
| SetAddress(string) |
| |
| Subject() string |
| SetSubject(string) |
| |
| ReplyTo() string |
| SetReplyTo(string) |
| |
| // CorrelationId is set on correlated request and response messages. It can be |
| // an a string, an unsigned long, a uuid or a binary value. |
| CorrelationId() interface{} |
| SetCorrelationId(interface{}) |
| |
| ContentType() string |
| SetContentType(string) |
| |
| ContentEncoding() string |
| SetContentEncoding(string) |
| |
| // ExpiryTime indicates an absolute time when the message may be dropped. |
| // A Zero time (i.e. t.isZero() == true) indicates a message never expires. |
| ExpiryTime() time.Time |
| SetExpiryTime(time.Time) |
| |
| CreationTime() time.Time |
| SetCreationTime(time.Time) |
| |
| GroupId() string |
| SetGroupId(string) |
| |
| GroupSequence() int32 |
| SetGroupSequence(int32) |
| |
| ReplyToGroupId() string |
| SetReplyToGroupId(string) |
| |
| // Properties set by the application to be carried with the message. |
| // Values must be simple types (not maps, lists or sequences) |
| ApplicationProperties() map[string]interface{} |
| SetApplicationProperties(map[string]interface{}) |
| |
| // Per-delivery annotations to provide delivery instructions. |
| // May be added or removed by intermediaries during delivery. |
| // See ApplicationProperties() for properties set by the application. |
| DeliveryAnnotations() map[AnnotationKey]interface{} |
| SetDeliveryAnnotations(map[AnnotationKey]interface{}) |
| |
| // Message annotations added as part of the bare message at creation, usually |
| // by an AMQP library. See ApplicationProperties() for properties set by the application. |
| MessageAnnotations() map[AnnotationKey]interface{} |
| SetMessageAnnotations(map[AnnotationKey]interface{}) |
| |
| // Inferred indicates how the message content |
| // is encoded into AMQP sections. If inferred is true then binary and |
| // list values in the body of the message will be encoded as AMQP DATA |
| // and AMQP SEQUENCE sections, respectively. If inferred is false, |
| // then all values in the body of the message will be encoded as AMQP |
| // VALUE sections regardless of their type. |
| Inferred() bool |
| SetInferred(bool) |
| |
| // Get the message body, using the amqp.Unmarshal() rules for interface{} |
| Body() interface{} |
| |
| // Set the body using amqp.Marshal() |
| SetBody(interface{}) |
| |
| // Marshal a Go value into the message body, synonym for SetBody() |
| Marshal(interface{}) |
| |
| // Unmarshal the message body, using amqp.Unmarshal() |
| Unmarshal(interface{}) |
| |
| // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough |
| // the message is encoded into it, otherwise a new buffer is created. |
| // Returns the buffer containing the message. |
| Encode(buffer []byte) ([]byte, error) |
| |
| // Decode data into this message. Overwrites an existing message content. |
| Decode(buffer []byte) error |
| |
| // Clear the message contents, set all fields to the default value. |
| Clear() |
| |
| // Copy the contents of another message to this one. |
| Copy(m Message) error |
| |
| // Deprecated: use DeliveryAnnotations() for a more type-safe interface |
| Instructions() map[string]interface{} |
| SetInstructions(v map[string]interface{}) |
| |
| // Deprecated: use MessageAnnotations() for a more type-safe interface |
| Annotations() map[string]interface{} |
| SetAnnotations(v map[string]interface{}) |
| |
| // Deprecated: use ApplicationProperties() for a more type-safe interface |
| Properties() map[string]interface{} |
| SetProperties(v map[string]interface{}) |
| |
| // Human-readable string showing message contents and properties |
| String() string |
| } |
| |
| // NewMessage creates a new message instance. |
| func NewMessage() Message { |
| m := &message{} |
| m.Clear() |
| return m |
| } |
| |
| // NewMessageWith creates a message with value as the body. |
| func NewMessageWith(value interface{}) Message { |
| m := NewMessage() |
| m.SetBody(value) |
| return m |
| } |
| |
| // NewMessageCopy creates a copy of an existing message. |
| func NewMessageCopy(m Message) Message { |
| m2 := NewMessage() |
| m2.Copy(m) |
| return m2 |
| } |
| |
| // Reset message to all default values |
| func (m *message) Clear() { *m = message{priority: 4} } |
| |
| // Copy makes a deep copy of message x |
| func (m *message) Copy(x Message) error { |
| var mc MessageCodec |
| bytes, err := mc.Encode(x, nil) |
| if err == nil { |
| err = mc.Decode(m, bytes) |
| } |
| return err |
| } |
| |
| type message struct { |
| address string |
| applicationProperties map[string]interface{} |
| contentEncoding string |
| contentType string |
| correlationId interface{} |
| creationTime time.Time |
| deliveryAnnotations map[AnnotationKey]interface{} |
| deliveryCount uint32 |
| durable bool |
| expiryTime time.Time |
| firstAcquirer bool |
| groupId string |
| groupSequence int32 |
| inferred bool |
| messageAnnotations map[AnnotationKey]interface{} |
| messageId interface{} |
| priority uint8 |
| replyTo string |
| replyToGroupId string |
| subject string |
| ttl time.Duration |
| userId string |
| body interface{} |
| // Keep the original data to support Unmarshal to a non-interface{} type |
| // Waste of memory, consider deprecating or making it optional. |
| pnBody *C.pn_data_t |
| } |
| |
| // ==== message get methods |
| func (m *message) Body() interface{} { return m.body } |
| func (m *message) Inferred() bool { return m.inferred } |
| func (m *message) Durable() bool { return m.durable } |
| func (m *message) Priority() uint8 { return m.priority } |
| func (m *message) TTL() time.Duration { return m.ttl } |
| func (m *message) FirstAcquirer() bool { return m.firstAcquirer } |
| func (m *message) DeliveryCount() uint32 { return m.deliveryCount } |
| func (m *message) MessageId() interface{} { return m.messageId } |
| func (m *message) UserId() string { return m.userId } |
| func (m *message) Address() string { return m.address } |
| func (m *message) Subject() string { return m.subject } |
| func (m *message) ReplyTo() string { return m.replyTo } |
| func (m *message) CorrelationId() interface{} { return m.correlationId } |
| func (m *message) ContentType() string { return m.contentType } |
| func (m *message) ContentEncoding() string { return m.contentEncoding } |
| func (m *message) ExpiryTime() time.Time { return m.expiryTime } |
| func (m *message) CreationTime() time.Time { return m.creationTime } |
| func (m *message) GroupId() string { return m.groupId } |
| func (m *message) GroupSequence() int32 { return m.groupSequence } |
| func (m *message) ReplyToGroupId() string { return m.replyToGroupId } |
| |
| func (m *message) DeliveryAnnotations() map[AnnotationKey]interface{} { |
| if m.deliveryAnnotations == nil { |
| m.deliveryAnnotations = make(map[AnnotationKey]interface{}) |
| } |
| return m.deliveryAnnotations |
| } |
| func (m *message) MessageAnnotations() map[AnnotationKey]interface{} { |
| if m.messageAnnotations == nil { |
| m.messageAnnotations = make(map[AnnotationKey]interface{}) |
| } |
| return m.messageAnnotations |
| } |
| func (m *message) ApplicationProperties() map[string]interface{} { |
| if m.applicationProperties == nil { |
| m.applicationProperties = make(map[string]interface{}) |
| } |
| return m.applicationProperties |
| } |
| |
| // ==== message set methods |
| |
| func (m *message) SetBody(v interface{}) { m.body = v } |
| func (m *message) SetInferred(x bool) { m.inferred = x } |
| func (m *message) SetDurable(x bool) { m.durable = x } |
| func (m *message) SetPriority(x uint8) { m.priority = x } |
| func (m *message) SetTTL(x time.Duration) { m.ttl = x } |
| func (m *message) SetFirstAcquirer(x bool) { m.firstAcquirer = x } |
| func (m *message) SetDeliveryCount(x uint32) { m.deliveryCount = x } |
| func (m *message) SetMessageId(x interface{}) { m.messageId = x } |
| func (m *message) SetUserId(x string) { m.userId = x } |
| func (m *message) SetAddress(x string) { m.address = x } |
| func (m *message) SetSubject(x string) { m.subject = x } |
| func (m *message) SetReplyTo(x string) { m.replyTo = x } |
| func (m *message) SetCorrelationId(x interface{}) { m.correlationId = x } |
| func (m *message) SetContentType(x string) { m.contentType = x } |
| func (m *message) SetContentEncoding(x string) { m.contentEncoding = x } |
| func (m *message) SetExpiryTime(x time.Time) { m.expiryTime = x } |
| func (m *message) SetCreationTime(x time.Time) { m.creationTime = x } |
| func (m *message) SetGroupId(x string) { m.groupId = x } |
| func (m *message) SetGroupSequence(x int32) { m.groupSequence = x } |
| func (m *message) SetReplyToGroupId(x string) { m.replyToGroupId = x } |
| |
| func (m *message) SetDeliveryAnnotations(x map[AnnotationKey]interface{}) { |
| m.deliveryAnnotations = x |
| } |
| func (m *message) SetMessageAnnotations(x map[AnnotationKey]interface{}) { |
| m.messageAnnotations = x |
| } |
| func (m *message) SetApplicationProperties(x map[string]interface{}) { |
| m.applicationProperties = x |
| } |
| |
| // Marshal body from v, same as SetBody(v). See amqp.Marshal. |
| func (m *message) Marshal(v interface{}) { m.body = v } |
| |
| func (m *message) Unmarshal(v interface{}) { |
| pnData := C.pn_data(2) |
| defer C.pn_data_free(pnData) |
| marshal(m.body, pnData) |
| unmarshal(v, pnData) |
| } |
| |
| // Internal use only |
| type MessageCodec struct { |
| pn *C.pn_message_t // Cache a pn_message_t to speed up encode/decode |
| // Optionally remember a byte buffer to use with MessageCodec methods. |
| Buffer []byte |
| } |
| |
| func (mc *MessageCodec) pnMessage() *C.pn_message_t { |
| if mc.pn == nil { |
| mc.pn = C.pn_message() |
| } |
| return mc.pn |
| } |
| |
| func (mc *MessageCodec) Close() { |
| if mc.pn != nil { |
| C.pn_message_free(mc.pn) |
| mc.pn = nil |
| } |
| } |
| |
| func (mc *MessageCodec) Decode(m Message, data []byte) error { |
| pn := mc.pnMessage() |
| if C.pn_message_decode(pn, cPtr(data), cLen(data)) < 0 { |
| return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(pn))) |
| } |
| m.(*message).get(pn) |
| return nil |
| } |
| |
| func (m *message) Decode(data []byte) error { |
| var mc MessageCodec |
| defer mc.Close() |
| return mc.Decode(m, data) |
| } |
| |
| func DecodeMessage(data []byte) (m Message, err error) { |
| m = NewMessage() |
| err = m.Decode(data) |
| return |
| } |
| |
| // Encode m using buffer. Return the final buffer used to hold m, |
| // may be different if the initial buffer was not large enough. |
| func (mc *MessageCodec) Encode(m Message, buffer []byte) ([]byte, error) { |
| pn := mc.pnMessage() |
| m.(*message).put(pn) |
| encode := func(buf []byte) ([]byte, error) { |
| len := cLen(buf) |
| result := C.pn_message_encode(pn, cPtr(buf), &len) |
| switch { |
| case result == C.PN_OVERFLOW: |
| return buf, overflow |
| case result < 0: |
| return buf, fmt.Errorf("cannot encode message: %s", PnErrorCode(result)) |
| default: |
| return buf[:len], nil |
| } |
| } |
| return encodeGrow(buffer, encode) |
| } |
| |
| func (m *message) Encode(buffer []byte) ([]byte, error) { |
| var mc MessageCodec |
| defer mc.Close() |
| return mc.Encode(m, buffer) |
| } |
| |
| // TODO aconway 2015-09-14: Multi-section messages. |
| |
| type ignoreFunc func(v interface{}) bool |
| |
| func isNil(v interface{}) bool { return v == nil } |
| func isZero(v interface{}) bool { return v == reflect.Zero(reflect.TypeOf(v)).Interface() } |
| func isEmpty(v interface{}) bool { return reflect.ValueOf(v).Len() == 0 } |
| |
| type stringBuilder struct { |
| bytes.Buffer |
| separator string |
| } |
| |
| func (b *stringBuilder) field(name string, value interface{}, ignore ignoreFunc) { |
| if !ignore(value) { |
| b.WriteString(b.separator) |
| b.separator = ", " |
| b.WriteString(name) |
| b.WriteString(": ") |
| fmt.Fprintf(&b.Buffer, "%v", value) |
| } |
| } |
| |
| // Human-readable string describing message. |
| // Includes only message fields with non-default values. |
| func (m *message) String() string { |
| var b stringBuilder |
| b.WriteString("Message{") |
| b.field("address", m.address, isEmpty) |
| b.field("durable", m.durable, isZero) |
| // Priority has weird default |
| b.field("priority", m.priority, func(v interface{}) bool { return v.(uint8) == 4 }) |
| b.field("ttl", m.ttl, isZero) |
| b.field("first-acquirer", m.firstAcquirer, isZero) |
| b.field("delivery-count", m.deliveryCount, isZero) |
| b.field("message-id", m.messageId, isNil) |
| b.field("user-id", m.userId, isEmpty) |
| b.field("subject", m.subject, isEmpty) |
| b.field("reply-to", m.replyTo, isEmpty) |
| b.field("correlation-id", m.correlationId, isNil) |
| b.field("content-type", m.contentType, isEmpty) |
| b.field("content-encoding", m.contentEncoding, isEmpty) |
| b.field("expiry-time", m.expiryTime, isZero) |
| b.field("creation-time", m.creationTime, isZero) |
| b.field("group-id", m.groupId, isEmpty) |
| b.field("group-sequence", m.groupSequence, isZero) |
| b.field("reply-to-group-id", m.replyToGroupId, isEmpty) |
| b.field("inferred", m.inferred, isZero) |
| b.field("delivery-annotations", m.deliveryAnnotations, isEmpty) |
| b.field("message-annotations", m.messageAnnotations, isEmpty) |
| b.field("application-properties", m.applicationProperties, isEmpty) |
| b.field("body", m.body, isNil) |
| b.WriteString("}") |
| return b.String() |
| } |
| |
| // ==== get message from pn_message_t |
| |
| func getData(v interface{}, data *C.pn_data_t) { |
| if data != nil && C.pn_data_size(data) > 0 { |
| C.pn_data_rewind(data) |
| C.pn_data_next(data) |
| unmarshal(v, data) |
| } |
| return |
| } |
| |
| func getString(c *C.char) string { |
| if c == nil { |
| return "" |
| } |
| return C.GoString(c) |
| } |
| |
| func (m *message) get(pn *C.pn_message_t) { |
| m.Clear() |
| m.inferred = bool(C.pn_message_is_inferred(pn)) |
| m.durable = bool(C.pn_message_is_durable(pn)) |
| m.priority = uint8(C.pn_message_get_priority(pn)) |
| m.ttl = goDuration(C.pn_message_get_ttl(pn)) |
| m.firstAcquirer = bool(C.pn_message_is_first_acquirer(pn)) |
| m.deliveryCount = uint32(C.pn_message_get_delivery_count(pn)) |
| getData(&m.messageId, C.pn_message_id(pn)) |
| m.userId = string(goBytes(C.pn_message_get_user_id(pn))) |
| m.address = getString(C.pn_message_get_address(pn)) |
| m.subject = getString(C.pn_message_get_subject(pn)) |
| m.replyTo = getString(C.pn_message_get_reply_to(pn)) |
| getData(&m.correlationId, C.pn_message_correlation_id(pn)) |
| m.contentType = getString(C.pn_message_get_content_type(pn)) |
| m.contentEncoding = getString(C.pn_message_get_content_encoding(pn)) |
| m.expiryTime = goTime(C.pn_message_get_expiry_time(pn)) |
| m.creationTime = goTime(C.pn_message_get_creation_time(pn)) |
| m.groupId = getString(C.pn_message_get_group_id(pn)) |
| m.groupSequence = int32(C.pn_message_get_group_sequence(pn)) |
| m.replyToGroupId = getString(C.pn_message_get_reply_to_group_id(pn)) |
| getData(&m.deliveryAnnotations, C.pn_message_instructions(pn)) |
| getData(&m.messageAnnotations, C.pn_message_annotations(pn)) |
| getData(&m.applicationProperties, C.pn_message_properties(pn)) |
| getData(&m.body, C.pn_message_body(pn)) |
| } |
| |
| // ==== put message to pn_message_t |
| |
| func putData(v interface{}, pn *C.pn_data_t) { |
| if v != nil { |
| C.pn_data_clear(pn) |
| marshal(v, pn) |
| } |
| } |
| |
| // For pointer-based fields (pn_data_t, strings, bytes) only |
| // put a field if it has a non-empty value |
| func (m *message) put(pn *C.pn_message_t) { |
| C.pn_message_clear(pn) |
| C.pn_message_set_inferred(pn, C.bool(m.inferred)) |
| C.pn_message_set_durable(pn, C.bool(m.durable)) |
| C.pn_message_set_priority(pn, C.uint8_t(m.priority)) |
| C.pn_message_set_ttl(pn, pnDuration(m.ttl)) |
| C.pn_message_set_first_acquirer(pn, C.bool(m.firstAcquirer)) |
| C.pn_message_set_delivery_count(pn, C.uint32_t(m.deliveryCount)) |
| putData(m.messageId, C.pn_message_id(pn)) |
| if m.userId != "" { |
| C.pn_message_set_user_id(pn, pnBytes(([]byte)(m.userId))) |
| } |
| if m.address != "" { |
| C.pn_message_set_address(pn, C.CString(m.address)) |
| } |
| if m.subject != "" { |
| C.pn_message_set_subject(pn, C.CString(m.subject)) |
| } |
| if m.replyTo != "" { |
| C.pn_message_set_reply_to(pn, C.CString(m.replyTo)) |
| } |
| putData(m.correlationId, C.pn_message_correlation_id(pn)) |
| if m.contentType != "" { |
| C.pn_message_set_content_type(pn, C.CString(m.contentType)) |
| } |
| if m.contentEncoding != "" { |
| C.pn_message_set_content_encoding(pn, C.CString(m.contentEncoding)) |
| } |
| C.pn_message_set_expiry_time(pn, pnTime(m.expiryTime)) |
| C.pn_message_set_creation_time(pn, pnTime(m.creationTime)) |
| if m.groupId != "" { |
| C.pn_message_set_group_id(pn, C.CString(m.groupId)) |
| } |
| C.pn_message_set_group_sequence(pn, C.pn_sequence_t(m.groupSequence)) |
| if m.replyToGroupId != "" { |
| C.pn_message_set_reply_to_group_id(pn, C.CString(m.replyToGroupId)) |
| } |
| if len(m.deliveryAnnotations) != 0 { |
| putData(m.deliveryAnnotations, C.pn_message_instructions(pn)) |
| } |
| if len(m.messageAnnotations) != 0 { |
| putData(m.messageAnnotations, C.pn_message_annotations(pn)) |
| } |
| if len(m.applicationProperties) != 0 { |
| putData(m.applicationProperties, C.pn_message_properties(pn)) |
| } |
| putData(m.body, C.pn_message_body(pn)) |
| } |
| |
| // ==== Deprecated functions |
| |
| func oldAnnotations(in map[AnnotationKey]interface{}) (out map[string]interface{}) { |
| if len(in) == 0 { |
| return nil |
| } |
| out = make(map[string]interface{}) |
| for k, v := range in { |
| out[k.String()] = v |
| } |
| return |
| } |
| |
| func (m *message) Instructions() map[string]interface{} { |
| return oldAnnotations(m.deliveryAnnotations) |
| } |
| func (m *message) Annotations() map[string]interface{} { |
| return oldAnnotations(m.messageAnnotations) |
| } |
| func (m *message) Properties() map[string]interface{} { |
| return m.applicationProperties |
| } |
| |
| // Convert old string-keyed annotations to an AnnotationKey map |
| func newAnnotations(in map[string]interface{}) (out map[AnnotationKey]interface{}) { |
| if len(in) == 0 { |
| return nil |
| } |
| out = make(map[AnnotationKey]interface{}) |
| for k, v := range in { |
| out[AnnotationKeyString(k)] = v |
| } |
| return |
| } |
| |
| func (m *message) SetInstructions(v map[string]interface{}) { |
| m.deliveryAnnotations = newAnnotations(v) |
| } |
| func (m *message) SetAnnotations(v map[string]interface{}) { |
| m.messageAnnotations = newAnnotations(v) |
| } |
| func (m *message) SetProperties(v map[string]interface{}) { |
| m.applicationProperties = v |
| } |