| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import pytest |
| from sqlalchemy import Column, Text, Integer, event |
| |
| from aria.modeling import ( |
| mixins, |
| types as modeling_types, |
| models |
| ) |
| from aria.modeling.exceptions import ValueFormatException |
| from aria.storage import ( |
| ModelStorage, |
| sql_mapi, |
| instrumentation |
| ) |
| |
| from . import release_sqlite_storage, init_inmemory_model_storage |
| |
| STUB = instrumentation._STUB |
| Value = instrumentation._Value |
| instruments_holder = [] |
| |
| |
| class TestInstrumentation(object): |
| |
| def test_track_changes(self, storage): |
| model_kwargs = dict( |
| name='name', |
| dict1={'initial': 'value'}, |
| dict2={'initial': 'value'}, |
| list1=['initial'], |
| list2=['initial'], |
| int1=0, |
| int2=0, |
| string2='string') |
| model1_instance = MockModel1(**model_kwargs) |
| model2_instance = MockModel2(**model_kwargs) |
| storage.mock_model_1.put(model1_instance) |
| storage.mock_model_2.put(model2_instance) |
| |
| instrument = self._track_changes({ |
| MockModel1.dict1: dict, |
| MockModel1.list1: list, |
| MockModel1.int1: int, |
| MockModel1.string2: str, |
| MockModel2.dict2: dict, |
| MockModel2.list2: list, |
| MockModel2.int2: int, |
| MockModel2.name: str |
| }) |
| |
| assert not instrument.tracked_changes |
| |
| storage_model1_instance = storage.mock_model_1.get(model1_instance.id) |
| storage_model2_instance = storage.mock_model_2.get(model2_instance.id) |
| |
| storage_model1_instance.dict1 = {'hello': 'world'} |
| storage_model1_instance.dict2 = {'should': 'not track'} |
| storage_model1_instance.list1 = ['hello'] |
| storage_model1_instance.list2 = ['should not track'] |
| storage_model1_instance.int1 = 100 |
| storage_model1_instance.int2 = 20000 |
| storage_model1_instance.name = 'should not track' |
| storage_model1_instance.string2 = 'new_string' |
| |
| storage_model2_instance.dict1.update({'should': 'not track'}) |
| storage_model2_instance.dict2.update({'hello': 'world'}) |
| storage_model2_instance.list1.append('should not track') |
| storage_model2_instance.list2.append('hello') |
| storage_model2_instance.int1 = 100 |
| storage_model2_instance.int2 = 20000 |
| storage_model2_instance.name = 'new_name' |
| storage_model2_instance.string2 = 'should not track' |
| |
| assert instrument.tracked_changes == { |
| 'mock_model_1': { |
| model1_instance.id: { |
| 'dict1': Value(STUB, {'hello': 'world'}), |
| 'list1': Value(STUB, ['hello']), |
| 'int1': Value(STUB, 100), |
| 'string2': Value(STUB, 'new_string') |
| } |
| }, |
| 'mock_model_2': { |
| model2_instance.id: { |
| 'dict2': Value({'initial': 'value'}, {'hello': 'world', 'initial': 'value'}), |
| 'list2': Value(['initial'], ['initial', 'hello']), |
| 'int2': Value(STUB, 20000), |
| 'name': Value(STUB, 'new_name'), |
| } |
| } |
| } |
| |
| def test_attribute_initial_none_value(self, storage): |
| instance1 = MockModel1(name='name1', dict1=None) |
| instance2 = MockModel1(name='name2', dict1=None) |
| storage.mock_model_1.put(instance1) |
| storage.mock_model_1.put(instance2) |
| instrument = self._track_changes({MockModel1.dict1: dict}) |
| instance1 = storage.mock_model_1.get(instance1.id) |
| instance2 = storage.mock_model_1.get(instance2.id) |
| instance1.dict1 = {'new': 'value'} |
| assert instrument.tracked_changes == { |
| 'mock_model_1': { |
| instance1.id: {'dict1': Value(STUB, {'new': 'value'})}, |
| instance2.id: {'dict1': Value(None, None)}, |
| } |
| } |
| |
| def test_attribute_set_none_value(self, storage): |
| instance = MockModel1(name='name') |
| storage.mock_model_1.put(instance) |
| instrument = self._track_changes({ |
| MockModel1.dict1: dict, |
| MockModel1.list1: list, |
| MockModel1.string2: str, |
| MockModel1.int1: int |
| }) |
| instance = storage.mock_model_1.get(instance.id) |
| instance.dict1 = None |
| instance.list1 = None |
| instance.string2 = None |
| instance.int1 = None |
| assert instrument.tracked_changes == { |
| 'mock_model_1': { |
| instance.id: { |
| 'dict1': Value(STUB, None), |
| 'list1': Value(STUB, None), |
| 'string2': Value(STUB, None), |
| 'int1': Value(STUB, None) |
| } |
| } |
| } |
| |
| def test_restore(self): |
| instrument = self._track_changes({MockModel1.dict1: dict}) |
| # set instance attribute, load instance, refresh instance and flush_refresh listeners |
| assert len(instrument.listeners) == 4 |
| for listener_args in instrument.listeners: |
| assert event.contains(*listener_args) |
| instrument.restore() |
| assert len(instrument.listeners) == 4 |
| for listener_args in instrument.listeners: |
| assert not event.contains(*listener_args) |
| return instrument |
| |
| def test_restore_twice(self): |
| instrument = self.test_restore() |
| instrument.restore() |
| |
| def test_instrumentation_context_manager(self, storage): |
| instance = MockModel1(name='name') |
| storage.mock_model_1.put(instance) |
| with self._track_changes({MockModel1.dict1: dict}) as instrument: |
| instance = storage.mock_model_1.get(instance.id) |
| instance.dict1 = {'new': 'value'} |
| assert instrument.tracked_changes == { |
| 'mock_model_1': {instance.id: {'dict1': Value(STUB, {'new': 'value'})}} |
| } |
| assert len(instrument.listeners) == 4 |
| for listener_args in instrument.listeners: |
| assert event.contains(*listener_args) |
| for listener_args in instrument.listeners: |
| assert not event.contains(*listener_args) |
| |
| def test_apply_tracked_changes(self, storage): |
| initial_values = {'dict1': {'initial': 'value'}, 'list1': ['initial']} |
| instance1_1 = MockModel1(name='instance1_1', **initial_values) |
| instance1_2 = MockModel1(name='instance1_2', **initial_values) |
| instance2_1 = MockModel2(name='instance2_1', **initial_values) |
| instance2_2 = MockModel2(name='instance2_2', **initial_values) |
| storage.mock_model_1.put(instance1_1) |
| storage.mock_model_1.put(instance1_2) |
| storage.mock_model_2.put(instance2_1) |
| storage.mock_model_2.put(instance2_2) |
| |
| instrument = self._track_changes({ |
| MockModel1.dict1: dict, |
| MockModel1.list1: list, |
| MockModel2.dict1: dict, |
| MockModel2.list1: list |
| }) |
| |
| def get_instances(): |
| return (storage.mock_model_1.get(instance1_1.id), |
| storage.mock_model_1.get(instance1_2.id), |
| storage.mock_model_2.get(instance2_1.id), |
| storage.mock_model_2.get(instance2_2.id)) |
| |
| instance1_1, instance1_2, instance2_1, instance2_2 = get_instances() |
| instance1_1.dict1 = {'new': 'value'} |
| instance1_2.list1 = ['new_value'] |
| instance2_1.dict1.update({'new': 'value'}) |
| instance2_2.list1.append('new_value') |
| |
| instrument.restore() |
| storage.mock_model_1._session.expire_all() |
| |
| instance1_1, instance1_2, instance2_1, instance2_2 = get_instances() |
| instance1_1.dict1 = {'overriding': 'value'} |
| instance1_2.list1 = ['overriding_value'] |
| instance2_1.dict1 = {'overriding': 'value'} |
| instance2_2.list1 = ['overriding_value'] |
| storage.mock_model_1.put(instance1_1) |
| storage.mock_model_1.put(instance1_2) |
| storage.mock_model_2.put(instance2_1) |
| storage.mock_model_2.put(instance2_2) |
| instance1_1, instance1_2, instance2_1, instance2_2 = get_instances() |
| assert instance1_1.dict1 == {'overriding': 'value'} |
| assert instance1_2.list1 == ['overriding_value'] |
| assert instance2_1.dict1 == {'overriding': 'value'} |
| assert instance2_2.list1 == ['overriding_value'] |
| |
| instrumentation.apply_tracked_changes( |
| tracked_changes=instrument.tracked_changes, |
| new_instances={}, |
| model=storage) |
| |
| instance1_1, instance1_2, instance2_1, instance2_2 = get_instances() |
| assert instance1_1.dict1 == {'new': 'value'} |
| assert instance1_2.list1 == ['new_value'] |
| assert instance2_1.dict1 == {'initial': 'value', 'new': 'value'} |
| assert instance2_2.list1 == ['initial', 'new_value'] |
| |
| def test_clear_instance(self, storage): |
| instance1 = MockModel1(name='name1') |
| instance2 = MockModel1(name='name2') |
| for instance in [instance1, instance2]: |
| storage.mock_model_1.put(instance) |
| instrument = self._track_changes({MockModel1.dict1: dict}) |
| instance1.dict1 = {'new': 'value'} |
| instance2.dict1 = {'new2': 'value2'} |
| assert instrument.tracked_changes == { |
| 'mock_model_1': { |
| instance1.id: {'dict1': Value(STUB, {'new': 'value'})}, |
| instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})} |
| } |
| } |
| instrument.clear(instance1) |
| assert instrument.tracked_changes == { |
| 'mock_model_1': { |
| instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})} |
| } |
| } |
| |
| def test_clear_all(self, storage): |
| instance1 = MockModel1(name='name1') |
| instance2 = MockModel1(name='name2') |
| for instance in [instance1, instance2]: |
| storage.mock_model_1.put(instance) |
| instrument = self._track_changes({MockModel1.dict1: dict}) |
| instance1.dict1 = {'new': 'value'} |
| instance2.dict1 = {'new2': 'value2'} |
| assert instrument.tracked_changes == { |
| 'mock_model_1': { |
| instance1.id: {'dict1': Value(STUB, {'new': 'value'})}, |
| instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})} |
| } |
| } |
| instrument.clear() |
| assert instrument.tracked_changes == {} |
| |
| def test_new_instances(self, storage): |
| model_kwargs = dict( |
| name='name', |
| dict1={'initial': 'value'}, |
| dict2={'initial': 'value'}, |
| list1=['initial'], |
| list2=['initial'], |
| int1=0, |
| int2=0, |
| string2='string') |
| model_instance_1 = MockModel1(**model_kwargs) |
| model_instance_2 = MockModel2(**model_kwargs) |
| |
| instrument = self._track_changes(model=storage, instrumented_new=(MockModel1,)) |
| assert not instrument.tracked_changes |
| |
| storage.mock_model_1.put(model_instance_1) |
| storage.mock_model_2.put(model_instance_2) |
| # Assert all models made it to storage |
| assert len(storage.mock_model_1.list()) == len(storage.mock_model_2.list()) == 1 |
| |
| # Assert only one model was tracked |
| assert len(instrument.new_instances) == 1 |
| |
| mock_model_1 = instrument.new_instances[MockModel1.__tablename__].values()[0] |
| storage_model1_instance = storage.mock_model_1.get(model_instance_1.id) |
| |
| for key in model_kwargs: |
| assert mock_model_1[key] == model_kwargs[key] == getattr(storage_model1_instance, key) |
| |
| def _track_changes(self, instrumented_modified=None, model=None, instrumented_new=None): |
| instrument = instrumentation.track_changes( |
| model=model, |
| instrumented={'modified': instrumented_modified or {}, 'new': instrumented_new or {}}) |
| instruments_holder.append(instrument) |
| return instrument |
| |
| def test_track_changes_to_strict_dict(self, storage): |
| model_kwargs = dict(strict_dict={'key': 'value'}, |
| strict_list=['item']) |
| model_instance = StrictMockModel(**model_kwargs) |
| storage.strict_mock_model.put(model_instance) |
| |
| instrument = self._track_changes({ |
| StrictMockModel.strict_dict: dict, |
| StrictMockModel.strict_list: list, |
| }) |
| |
| assert not instrument.tracked_changes |
| |
| storage_model_instance = storage.strict_mock_model.get(model_instance.id) |
| |
| with pytest.raises(ValueFormatException): |
| storage_model_instance.strict_dict = {1: 1} |
| |
| with pytest.raises(ValueFormatException): |
| storage_model_instance.strict_dict = {'hello': 1} |
| |
| with pytest.raises(ValueFormatException): |
| storage_model_instance.strict_dict = {1: 'hello'} |
| |
| storage_model_instance.strict_dict = {'hello': 'world'} |
| assert storage_model_instance.strict_dict == {'hello': 'world'} |
| |
| with pytest.raises(ValueFormatException): |
| storage_model_instance.strict_list = [1] |
| storage_model_instance.strict_list = ['hello'] |
| assert storage_model_instance.strict_list == ['hello'] |
| |
| assert instrument.tracked_changes == { |
| 'strict_mock_model': { |
| model_instance.id: { |
| 'strict_dict': Value(STUB, {'hello': 'world'}), |
| 'strict_list': Value(STUB, ['hello']), |
| } |
| }, |
| } |
| |
| |
| @pytest.fixture(autouse=True) |
| def restore_instrumentation(): |
| yield |
| for instrument in instruments_holder: |
| instrument.restore() |
| del instruments_holder[:] |
| |
| |
| @pytest.fixture |
| def storage(): |
| result = ModelStorage(api_cls=sql_mapi.SQLAlchemyModelAPI, |
| items=(MockModel1, MockModel2, StrictMockModel), |
| initiator=init_inmemory_model_storage) |
| yield result |
| release_sqlite_storage(result) |
| |
| |
| class _MockModel(mixins.ModelMixin): |
| name = Column(Text) |
| dict1 = Column(modeling_types.Dict) |
| dict2 = Column(modeling_types.Dict) |
| list1 = Column(modeling_types.List) |
| list2 = Column(modeling_types.List) |
| int1 = Column(Integer) |
| int2 = Column(Integer) |
| string2 = Column(Text) |
| |
| |
| class MockModel1(_MockModel, models.aria_declarative_base): |
| __tablename__ = 'mock_model_1' |
| |
| |
| class MockModel2(_MockModel, models.aria_declarative_base): |
| __tablename__ = 'mock_model_2' |
| |
| |
| class StrictMockModel(mixins.ModelMixin, models.aria_declarative_base): |
| __tablename__ = 'strict_mock_model' |
| |
| strict_dict = Column(modeling_types.StrictDict(basestring, basestring)) |
| strict_list = Column(modeling_types.StrictList(basestring)) |