blob: 9c360a6f35a5cbee565e7bd19524ade72a9e7d52 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Generate encoded AMQP fragments for interop testing.
import logging, optparse, os, struct, sys, time, traceback, types, cgi
from proton import *
def main(argv):
def write(data, filename):
f = open(filename+".amqp", 'w')
f.write(data.encode())
f.close()
# message
m = Message()
d = Data()
d.put_string("hello")
m.body = d.encode()
write(m, "message")
# null
d = Data()
d.put_null()
write(d, "null")
# primitive types
d = Data()
d.put_bool(True)
d.put_bool(False)
d.put_ubyte(42)
d.put_ushort(42)
d.put_short(-42)
d.put_uint(12345)
d.put_int(-12345)
d.put_ulong(12345)
d.put_long(-12345)
d.put_float(0.125)
d.put_double(0.125)
write(d, "primitives")
# string types
d = Data()
d.put_binary("abc\0defg")
d.put_string("abcdefg")
d.put_symbol("abcdefg")
d.put_binary("")
d.put_string("")
d.put_symbol("")
write(d, "strings")
# described types
d = Data()
d.put_described()
d.enter()
d.put_symbol("foo-descriptor")
d.put_string("foo-value")
d.exit()
d.put_described()
d.enter()
d.put_int(12)
d.put_int(13)
d.exit()
write(d, "described")
# described array
d = Data()
d.put_array(True, Data.INT)
d.enter()
d.put_symbol("int-array")
for i in xrange(0,10): d.put_int(i)
d.exit()
write(d, "described_array")
# Arrays
# Integer array
d = Data()
d.put_array(False, Data.INT)
d.enter()
for i in xrange(0,100): d.put_int(i)
d.exit()
# String array
d.put_array(False, Data.STRING)
d.enter()
for i in ["a", "b", "c"]: d.put_string(i)
d.exit()
# empty array
d.put_array(False, Data.INT)
write(d, "arrays")
# List - mixed types
d = Data()
d.put_list()
d.enter()
d.put_int(32)
d.put_string("foo")
d.put_bool(True)
d.exit()
d.put_list() # Empty list
write(d, "lists")
# Maps
d = Data()
d.put_map()
d.enter()
for k,v in {"one":1, "two":2, "three":3}.items():
d.put_string(k)
d.put_int(v)
d.exit()
d.put_map()
d.enter()
for k,v in {1:"one", 2:"two", 3:"three"}.items():
d.put_int(k)
d.put_string(v)
d.exit()
d.put_map() # Empty map
write(d, "maps")
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))