| #!/usr/bin/env python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from unittest import TestCase, main |
| import pulsar |
| from pulsar.schema import * |
| from enum import Enum |
| import json |
| |
| |
| class SchemaTest(TestCase): |
| |
| serviceUrl = 'pulsar://localhost:6650' |
| |
| def test_simple(self): |
| class Color(Enum): |
| red = 1 |
| green = 2 |
| blue = 3 |
| |
| class Example(Record): |
| a = String() |
| b = Integer() |
| c = Array(String()) |
| d = Color |
| e = Boolean() |
| f = Float() |
| g = Double() |
| h = Bytes() |
| i = Map(String()) |
| |
| self.assertEqual(Example.schema(), { |
| "name": "Example", |
| "type": "record", |
| "fields": [ |
| {"name": "a", "type": ["null", "string"]}, |
| {"name": "b", "type": ["null", "int"]}, |
| {"name": "c", "type": ["null", { |
| "type": "array", |
| "items": "string"}] |
| }, |
| {"name": "d", |
| "type": ["null", { |
| "type": "enum", |
| "name": "Color", |
| "symbols": ["red", "green", "blue"]}] |
| }, |
| {"name": "e", "type": ["null", "boolean"]}, |
| {"name": "f", "type": ["null", "float"]}, |
| {"name": "g", "type": ["null", "double"]}, |
| {"name": "h", "type": ["null", "bytes"]}, |
| {"name": "i", "type": ["null", { |
| "type": "map", |
| "values": "string"}] |
| }, |
| ] |
| }) |
| |
| def test_complex(self): |
| class MySubRecord(Record): |
| x = Integer() |
| y = Long() |
| z = String() |
| |
| class Example(Record): |
| a = String() |
| sub = MySubRecord # Test with class |
| sub2 = MySubRecord() # Test with instance |
| |
| self.assertEqual(Example.schema(), { |
| "name": "Example", |
| "type": "record", |
| "fields": [ |
| {"name": "a", "type": ["null", "string"]}, |
| {"name": "sub", |
| "type": ["null", { |
| "name": "MySubRecord", |
| "type": "record", |
| "fields": [{"name": "x", "type": ["null", "int"]}, |
| {"name": "y", "type": ["null", "long"]}, |
| {"name": "z", "type": ["null", "string"]}] |
| }] |
| }, |
| {"name": "sub2", |
| "type": ["null", { |
| "name": "MySubRecord", |
| "type": "record", |
| "fields": [{"name": "x", "type": ["null", "int"]}, |
| {"name": "y", "type": ["null", "long"]}, |
| {"name": "z", "type": ["null", "string"]}] |
| }] |
| } |
| ] |
| }) |
| |
| def test_complex_with_required_fields(self): |
| class MySubRecord(Record): |
| x = Integer(required=True) |
| y = Long(required=True) |
| z = String() |
| |
| class Example(Record): |
| a = String(required=True) |
| sub = MySubRecord(required=True) |
| |
| self.assertEqual(Example.schema(), { |
| "name": "Example", |
| "type": "record", |
| "fields": [ |
| {"name": "a", "type": "string"}, |
| {"name": "sub", |
| "type": { |
| "name": "MySubRecord", |
| "type": "record", |
| "fields": [{"name": "x", "type": "int"}, |
| {"name": "y", "type": "long"}, |
| {"name": "z", "type": ["null", "string"]}] |
| } |
| }, |
| ] |
| }) |
| |
| def test_invalid_enum(self): |
| class Color: |
| red = 1 |
| green = 2 |
| blue = 3 |
| |
| class InvalidEnum(Record): |
| a = Integer() |
| b = Color |
| |
| # Enum will be ignored |
| self.assertEqual(InvalidEnum.schema(), |
| {'name': 'InvalidEnum', 'type': 'record', |
| 'fields': [{'name': 'a', 'type': ["null", 'int']}]}) |
| |
| def test_initialization(self): |
| class Example(Record): |
| a = Integer() |
| b = Integer() |
| |
| r = Example(a=1, b=2) |
| self.assertEqual(r.a, 1) |
| self.assertEqual(r.b, 2) |
| |
| r.b = 5 |
| |
| self.assertEqual(r.b, 5) |
| |
| # Setting non-declared field should fail |
| try: |
| r.c = 3 |
| self.fail('Should have failed') |
| except AttributeError: |
| # Expected |
| pass |
| |
| try: |
| Record(a=1, c=8) |
| self.fail('Should have failed') |
| except AttributeError: |
| # Expected |
| pass |
| |
| try: |
| Record('xyz', a=1, b=2) |
| self.fail('Should have failed') |
| except TypeError: |
| # Expected |
| pass |
| |
| def _expectTypeError(self, func): |
| try: |
| func() |
| self.fail('Should have failed') |
| except TypeError: |
| # Expected |
| pass |
| |
| def test_field_type_check(self): |
| class Example(Record): |
| a = Integer() |
| b = String(required=False) |
| |
| self._expectTypeError(lambda: Example(a=1, b=2)) |
| |
| class E2(Record): |
| a = Boolean() |
| |
| E2(a=False) # ok |
| self._expectTypeError(lambda: E2(a=1)) |
| |
| class E3(Record): |
| a = Float() |
| |
| E3(a=1.0) # Ok |
| self._expectTypeError(lambda: E3(a=1)) |
| |
| class E4(Record): |
| a = Null() |
| |
| E4(a=None) # Ok |
| self._expectTypeError(lambda: E4(a=1)) |
| |
| class E5(Record): |
| a = Long() |
| |
| E5(a=1234) # Ok |
| self._expectTypeError(lambda: E5(a=1.12)) |
| |
| class E6(Record): |
| a = String() |
| |
| E6(a="hello") # Ok |
| self._expectTypeError(lambda: E5(a=1.12)) |
| |
| class E6(Record): |
| a = Bytes() |
| |
| E6(a="hello".encode('utf-8')) # Ok |
| self._expectTypeError(lambda: E5(a=1.12)) |
| |
| class E7(Record): |
| a = Double() |
| |
| E7(a=1.0) # Ok |
| self._expectTypeError(lambda: E3(a=1)) |
| |
| class Color(Enum): |
| red = 1 |
| green = 2 |
| blue = 3 |
| |
| class OtherEnum(Enum): |
| red = 1 |
| green = 2 |
| blue = 3 |
| |
| class E8(Record): |
| a = Color |
| |
| e = E8(a=Color.red) # Ok |
| self.assertEqual(e.a, Color.red) |
| |
| e = E8(a='red') # Ok |
| self.assertEqual(e.a, Color.red) |
| |
| e = E8(a=1) # Ok |
| self.assertEqual(e.a, Color.red) |
| |
| self._expectTypeError(lambda: E8(a='redx')) |
| self._expectTypeError(lambda: E8(a=OtherEnum.red)) |
| self._expectTypeError(lambda: E8(a=5)) |
| |
| class E9(Record): |
| a = Array(String()) |
| |
| E9(a=['a', 'b', 'c']) # Ok |
| self._expectTypeError(lambda: E9(a=1)) |
| self._expectTypeError(lambda: E9(a=[1, 2, 3])) |
| self._expectTypeError(lambda: E9(a=['1', '2', 3])) |
| |
| class E10(Record): |
| a = Map(Integer()) |
| |
| E10(a={'a': 1, 'b': 2}) # Ok |
| self._expectTypeError(lambda: E10(a=1)) |
| self._expectTypeError(lambda: E10(a={'a': '1', 'b': 2})) |
| self._expectTypeError(lambda: E10(a={1: 1, 'b': 2})) |
| |
| class SubRecord1(Record): |
| s = Integer() |
| |
| class SubRecord2(Record): |
| s = String() |
| |
| class E11(Record): |
| a = SubRecord1 |
| |
| E11(a=SubRecord1(s=1)) # Ok |
| self._expectTypeError(lambda: E11(a=1)) |
| self._expectTypeError(lambda: E11(a=SubRecord2(s='hello'))) |
| |
| def test_field_type_check_defaults(self): |
| try: |
| class Example(Record): |
| a = Integer(default="xyz") |
| |
| self.fail("Class declaration should have failed") |
| except TypeError: |
| pass # Expected |
| |
| |
| def test_serialize_json(self): |
| class Example(Record): |
| a = Integer() |
| b = Integer() |
| |
| self.assertEqual(Example.schema(), { |
| "name": "Example", |
| "type": "record", |
| "fields": [ |
| {"name": "a", "type": ["null", "int"]}, |
| {"name": "b", "type": ["null", "int"]}, |
| ] |
| }) |
| |
| s = JsonSchema(Example) |
| r = Example(a=1, b=2) |
| data = s.encode(r) |
| self.assertEqual(json.loads(data), {'a': 1, 'b': 2}) |
| |
| r2 = s.decode(data) |
| self.assertEqual(r2.__class__.__name__, 'Example') |
| self.assertEqual(r2, r) |
| |
| def test_serialize_avro(self): |
| class Example(Record): |
| a = Integer() |
| b = Integer() |
| |
| self.assertEqual(Example.schema(), { |
| "name": "Example", |
| "type": "record", |
| "fields": [ |
| {"name": "a", "type": ["null", "int"]}, |
| {"name": "b", "type": ["null", "int"]}, |
| ] |
| }) |
| |
| s = AvroSchema(Example) |
| r = Example(a=1, b=2) |
| data = s.encode(r) |
| |
| r2 = s.decode(data) |
| self.assertEqual(r2.__class__.__name__, 'Example') |
| self.assertEqual(r2, r) |
| |
| def test_schema_version(self): |
| class Example(Record): |
| a = Integer() |
| b = Integer() |
| |
| client = pulsar.Client(self.serviceUrl) |
| producer = client.create_producer( |
| 'my-avro-python-schema-version-topic', |
| schema=AvroSchema(Example)) |
| |
| consumer = client.subscribe('my-avro-python-schema-version-topic', 'sub-1', |
| schema=AvroSchema(Example)) |
| |
| r = Example(a=1, b=2) |
| producer.send(r) |
| |
| msg = consumer.receive() |
| |
| self.assertIsNotNone(msg.schema_version()) |
| |
| self.assertEquals(b'\x00\x00\x00\x00\x00\x00\x00\x00', msg.schema_version().encode()) |
| |
| self.assertEqual(r, msg.value()) |
| |
| client.close() |
| |
| def test_serialize_wrong_types(self): |
| class Example(Record): |
| a = Integer() |
| b = Integer() |
| |
| class Foo(Record): |
| x = Integer() |
| y = Integer() |
| |
| s = JsonSchema(Example) |
| try: |
| data = s.encode(Foo(x=1, y=2)) |
| self.fail('Should have failed') |
| except TypeError: |
| pass # expected |
| |
| try: |
| data = s.encode('hello') |
| self.fail('Should have failed') |
| except TypeError: |
| pass # expected |
| |
| def test_defaults(self): |
| class Example(Record): |
| a = Integer(default=5) |
| b = Integer() |
| c = String(default='hello') |
| |
| r = Example() |
| self.assertEqual(r.a, 5) |
| self.assertEqual(r.b, 0) |
| self.assertEqual(r.c, 'hello') |
| |
| #### |
| |
| def test_json_schema(self): |
| |
| class Example(Record): |
| a = Integer() |
| b = Integer() |
| |
| # Incompatible variation of the class |
| class BadExample(Record): |
| a = String() |
| b = Integer() |
| |
| client = pulsar.Client(self.serviceUrl) |
| producer = client.create_producer( |
| 'my-json-python-topic', |
| schema=JsonSchema(Example)) |
| |
| |
| # Validate that incompatible schema is rejected |
| try: |
| client.subscribe('my-json-python-topic', 'sub-1', |
| schema=JsonSchema(BadExample)) |
| self.fail('Should have failed') |
| except Exception as e: |
| pass # Expected |
| |
| try: |
| client.subscribe('my-json-python-topic', 'sub-1', |
| schema=StringSchema(BadExample)) |
| self.fail('Should have failed') |
| except Exception as e: |
| pass # Expected |
| |
| try: |
| client.subscribe('my-json-python-topic', 'sub-1', |
| schema=AvroSchema(BadExample)) |
| self.fail('Should have failed') |
| except Exception as e: |
| pass # Expected |
| |
| consumer = client.subscribe('my-json-python-topic', 'sub-1', |
| schema=JsonSchema(Example)) |
| |
| r = Example(a=1, b=2) |
| producer.send(r) |
| |
| msg = consumer.receive() |
| |
| self.assertEqual(r, msg.value()) |
| client.close() |
| |
| def test_string_schema(self): |
| client = pulsar.Client(self.serviceUrl) |
| producer = client.create_producer( |
| 'my-string-python-topic', |
| schema=StringSchema()) |
| |
| |
| # Validate that incompatible schema is rejected |
| try: |
| class Example(Record): |
| a = Integer() |
| b = Integer() |
| |
| client.create_producer('my-string-python-topic', |
| schema=JsonSchema(Example)) |
| self.fail('Should have failed') |
| except Exception as e: |
| pass # Expected |
| |
| consumer = client.subscribe('my-string-python-topic', 'sub-1', |
| schema=StringSchema()) |
| |
| producer.send("Hello") |
| |
| msg = consumer.receive() |
| |
| self.assertEqual("Hello", msg.value()) |
| self.assertEqual(b"Hello", msg.data()) |
| client.close() |
| |
| |
| def test_bytes_schema(self): |
| client = pulsar.Client(self.serviceUrl) |
| producer = client.create_producer( |
| 'my-bytes-python-topic', |
| schema=BytesSchema()) |
| |
| |
| # Validate that incompatible schema is rejected |
| try: |
| class Example(Record): |
| a = Integer() |
| b = Integer() |
| |
| client.create_producer('my-bytes-python-topic', |
| schema=JsonSchema(Example)) |
| self.fail('Should have failed') |
| except Exception as e: |
| pass # Expected |
| |
| consumer = client.subscribe('my-bytes-python-topic', 'sub-1', |
| schema=BytesSchema()) |
| |
| producer.send(b"Hello") |
| |
| msg = consumer.receive() |
| |
| self.assertEqual(b"Hello", msg.value()) |
| client.close() |
| |
| def test_avro_schema(self): |
| |
| class Example(Record): |
| a = Integer() |
| b = Integer() |
| |
| # Incompatible variation of the class |
| class BadExample(Record): |
| a = String() |
| b = Integer() |
| |
| client = pulsar.Client(self.serviceUrl) |
| producer = client.create_producer( |
| 'my-avro-python-topic', |
| schema=AvroSchema(Example)) |
| |
| # Validate that incompatible schema is rejected |
| try: |
| client.subscribe('my-avro-python-topic', 'sub-1', |
| schema=AvroSchema(BadExample)) |
| self.fail('Should have failed') |
| except Exception as e: |
| pass # Expected |
| |
| try: |
| client.subscribe('my-avro-python-topic', 'sub-2', |
| schema=JsonSchema(Example)) |
| self.fail('Should have failed') |
| except Exception as e: |
| pass # Expected |
| |
| consumer = client.subscribe('my-avro-python-topic', 'sub-3', |
| schema=AvroSchema(Example)) |
| |
| r = Example(a=1, b=2) |
| producer.send(r) |
| |
| msg = consumer.receive() |
| |
| self.assertEqual(r, msg.value()) |
| client.close() |
| |
| def test_json_enum(self): |
| class MyEnum(Enum): |
| A = 1 |
| B = 2 |
| C = 3 |
| |
| class Example(Record): |
| name = String() |
| v = MyEnum |
| |
| topic = 'my-json-enum-topic' |
| |
| client = pulsar.Client(self.serviceUrl) |
| producer = client.create_producer( |
| topic=topic, |
| schema=JsonSchema(Example)) |
| |
| consumer = client.subscribe(topic, 'test', |
| schema=JsonSchema(Example)) |
| |
| r = Example(name='test', v=MyEnum.C) |
| producer.send(r) |
| |
| msg = consumer.receive() |
| |
| self.assertEqual('test', msg.value().name) |
| self.assertEqual(MyEnum.C, MyEnum(msg.value().v)) |
| client.close() |
| |
| def test_avro_enum(self): |
| class MyEnum(Enum): |
| A = 1 |
| B = 2 |
| C = 3 |
| |
| class Example(Record): |
| name = String() |
| v = MyEnum |
| |
| topic = 'my-avro-enum-topic' |
| |
| client = pulsar.Client(self.serviceUrl) |
| producer = client.create_producer( |
| topic=topic, |
| schema=AvroSchema(Example)) |
| |
| consumer = client.subscribe(topic, 'test', |
| schema=AvroSchema(Example)) |
| |
| r = Example(name='test', v=MyEnum.C) |
| producer.send(r) |
| |
| msg = consumer.receive() |
| msg.value() |
| self.assertEqual(MyEnum.C, msg.value().v) |
| client.close() |
| |
| def test_avro_map_array(self): |
| class MapArray(Record): |
| values = Map(Array(Integer())) |
| |
| class MapMap(Record): |
| values = Map(Map(Integer())) |
| |
| class ArrayMap(Record): |
| values = Array(Map(Integer())) |
| |
| class ArrayArray(Record): |
| values = Array(Array(Integer())) |
| |
| topic_prefix = "my-avro-map-array-topic-" |
| data_list = ( |
| (topic_prefix + "0", AvroSchema(MapArray), |
| MapArray(values={"A": [1, 2], "B": [3]})), |
| (topic_prefix + "1", AvroSchema(MapMap), |
| MapMap(values={"A": {"B": 2},})), |
| (topic_prefix + "2", AvroSchema(ArrayMap), |
| ArrayMap(values=[{"A": 1}, {"B": 2}, {"C": 3}])), |
| (topic_prefix + "3", AvroSchema(ArrayArray), |
| ArrayArray(values=[[1, 2, 3], [4]])), |
| ) |
| |
| client = pulsar.Client(self.serviceUrl) |
| for data in data_list: |
| topic = data[0] |
| schema = data[1] |
| record = data[2] |
| |
| producer = client.create_producer(topic, schema=schema) |
| consumer = client.subscribe(topic, 'sub', schema=schema) |
| |
| producer.send(record) |
| msg = consumer.receive() |
| self.assertEqual(msg.value().values, record.values) |
| consumer.acknowledge(msg) |
| consumer.close() |
| producer.close() |
| |
| client.close() |
| |
| def test_default_value(self): |
| class MyRecord(Record): |
| A = Integer() |
| B = String() |
| C = Boolean() |
| D = Double(default=6.4) |
| |
| topic = "my-default-value-topic" |
| |
| client = pulsar.Client(self.serviceUrl) |
| producer = client.create_producer( |
| topic=topic, |
| schema=JsonSchema(MyRecord)) |
| |
| consumer = client.subscribe(topic, 'test', schema=JsonSchema(MyRecord)) |
| |
| r = MyRecord(A=5, B="text") |
| producer.send(r) |
| |
| msg = consumer.receive() |
| self.assertEqual(msg.value().A, 5) |
| self.assertEqual(msg.value().B, u'text') |
| self.assertEqual(msg.value().C, False) |
| self.assertEqual(msg.value().D, 6.4) |
| |
| producer.close() |
| consumer.close() |
| client.close() |
| |
| if __name__ == '__main__': |
| main() |