blob: d406beadcb53daa8b5ebf47c1f779a67b2cce987 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from org.apache.qpid.proton import Proton
from org.apache.qpid.proton.amqp.messaging import AmqpValue, AmqpSequence, \
Data as DataSection, ApplicationProperties, MessageAnnotations, DeliveryAnnotations
from ccodec import *
from cerror import *
from org.apache.qpid.proton.amqp import Binary
# from proton/message.h
PN_DATA = 0
PN_TEXT = 1
PN_AMQP = 2
PN_JSON = 3
PN_DEFAULT_PRIORITY = 4
class pn_message_wrapper:
def __init__(self):
self.inferred = False
self.impl = Proton.message()
self.id = pn_data(0)
self.correlation_id = pn_data(0)
self.instructions = pn_data(0)
self.annotations = pn_data(0)
self.properties = pn_data(0)
self.body = pn_data(0)
def decode(self, impl):
self.impl = impl
self.post_decode()
def post_decode(self):
obj2dat(self.impl.getMessageId(), self.id)
self.id.next()
obj2dat(self.impl.getCorrelationId(), self.correlation_id)
self.correlation_id.next()
def peel(x):
if x is not None:
return x.getValue()
return None
obj2dat(peel(self.impl.getDeliveryAnnotations()), self.instructions)
obj2dat(peel(self.impl.getMessageAnnotations()), self.annotations)
obj2dat(peel(self.impl.getApplicationProperties()), self.properties)
bod = self.impl.getBody()
if bod is not None: bod = bod.getValue()
obj2dat(bod, self.body)
def pre_encode(self):
self.impl.setMessageId(dat2obj(self.id))
self.impl.setCorrelationId(dat2obj(self.correlation_id))
def wrap(x, wrapper):
if x is not None:
return wrapper(x)
return None
self.impl.setDeliveryAnnotations(wrap(dat2obj(self.instructions), DeliveryAnnotations))
self.impl.setMessageAnnotations(wrap(dat2obj(self.annotations), MessageAnnotations))
self.impl.setApplicationProperties(wrap(dat2obj(self.properties), ApplicationProperties))
bod = dat2obj(self.body)
if self.inferred:
if isinstance(bod, bytes):
bod = DataSection(Binary(array(bod, 'b')))
elif isinstance(bod, list):
bod = AmqpSequence(bod)
else:
bod = AmqpValue(bod)
else:
bod = AmqpValue(bod)
self.impl.setBody(bod)
def __repr__(self):
return self.impl.toString()
def pn_message():
return pn_message_wrapper()
def pn_message_id(msg):
return msg.id
def pn_message_correlation_id(msg):
return msg.correlation_id
def pn_message_get_address(msg):
return msg.impl.getAddress()
def pn_message_set_address(msg, address):
msg.impl.setAddress(address)
return 0
def pn_message_get_reply_to(msg):
return msg.impl.getReplyTo()
def pn_message_set_reply_to(msg, address):
msg.impl.setReplyTo(address)
return 0
def pn_message_get_reply_to_group_id(msg):
return msg.impl.getReplyToGroupId()
def pn_message_set_reply_to_group_id(msg, id):
msg.impl.setReplyToGroupId(id)
return 0
def pn_message_get_group_sequence(msg):
return msg.impl.getGroupSequence()
def pn_message_set_group_sequence(msg, seq):
msg.impl.setGroupSequence(seq)
return 0
def pn_message_get_group_id(msg):
return msg.impl.getGroupId()
def pn_message_set_group_id(msg, id):
msg.impl.setGroupId(id)
return 0
def pn_message_is_first_acquirer(msg):
return msg.impl.isFirstAcquirer()
def pn_message_set_first_acquirer(msg, b):
msg.impl.setFirstAcquirer(b)
return 0
def pn_message_is_durable(msg):
return msg.impl.isDurable()
def pn_message_set_durable(msg, b):
msg.impl.setDurable(b)
return 0
def pn_message_get_delivery_count(msg):
return msg.impl.getDeliveryCount()
def pn_message_set_delivery_count(msg, c):
msg.impl.setDeliveryCount(c)
return 0
def pn_message_get_creation_time(msg):
return msg.impl.getCreationTime()
def pn_message_set_creation_time(msg, t):
msg.impl.setCreationTime(t)
return 0
def pn_message_get_expiry_time(msg):
return msg.impl.getExpiryTime()
def pn_message_set_expiry_time(msg, t):
msg.impl.setExpiryTime(t)
return 0
def pn_message_get_content_type(msg):
return msg.impl.getContentType()
def pn_message_set_content_type(msg, ct):
msg.impl.setContentType(ct)
return 0
def pn_message_get_content_encoding(msg):
return msg.impl.getContentEncoding()
def pn_message_set_content_encoding(msg, ct):
msg.impl.setContentEncoding(ct)
return 0
def pn_message_get_subject(msg):
return msg.impl.getSubject()
def pn_message_set_subject(msg, value):
msg.impl.setSubject(value)
return 0
def pn_message_get_priority(msg):
return msg.impl.getPriority()
def pn_message_set_priority(msg, p):
msg.impl.setPriority(p)
return 0
def pn_message_get_ttl(msg):
return msg.impl.getTtl()
def pn_message_set_ttl(msg, ttl):
msg.impl.setTtl(ttl)
return 0
def pn_message_get_user_id(msg):
uid = msg.impl.getUserId()
if uid is None:
return ""
else:
return uid.tostring()
def pn_message_set_user_id(msg, uid):
msg.impl.setUserId(uid)
return 0
def pn_message_instructions(msg):
return msg.instructions
def pn_message_annotations(msg):
return msg.annotations
def pn_message_properties(msg):
return msg.properties
def pn_message_body(msg):
return msg.body
def pn_message_decode(msg, data):
n = msg.impl.decode(array(data, 'b'), 0, len(data))
msg.post_decode()
return n
from java.nio import BufferOverflowException
def pn_message_encode(msg, size):
msg.pre_encode()
ba = zeros(size, 'b')
# XXX: shouldn't have to use the try/catch
try:
n = msg.impl.encode(ba, 0, size)
if n >= 0:
return n, ba[:n].tostring()
else:
return n
except BufferOverflowException, e:
return PN_OVERFLOW, None
def pn_message_clear(msg):
msg.impl.clear()