| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import json |
| import threading |
| |
| import pytest |
| from kazoo.client import KazooClient |
| from kazoo.exceptions import KazooException |
| from kazoo.security import ACL, Id, Permissions as KazooPermissions |
| from mock import MagicMock, call, create_autospec, patch |
| from twitter.common.contextutil import temporary_file |
| from twitter.common.quantity import Amount, Time |
| from twitter.common.testing.clock import ThreadedClock |
| from twitter.common.zookeeper.serverset import Endpoint, ServerSet |
| |
| from apache.aurora.config.schema.base import HealthCheckConfig |
| from apache.aurora.executor.common.announcer import ( |
| Announcer, |
| DefaultAnnouncerCheckerProvider, |
| ServerSetJoinThread, |
| make_endpoints, |
| make_zk_auth |
| ) |
| from apache.aurora.executor.common.announcer_zkauth_schema import Access, Auth, Permissions, ZkAuth |
| |
| |
| def test_serverset_join_thread(): |
| join = threading.Event() |
| joined = threading.Event() |
| |
| def join_function(): |
| joined.set() |
| |
| ssjt = ServerSetJoinThread(join, join_function, loop_wait=Amount(1, Time.MILLISECONDS)) |
| ssjt.start() |
| ssjt.stop() |
| ssjt.join(timeout=1.0) |
| assert not ssjt.is_alive() |
| assert not joined.is_set() |
| |
| ssjt = ServerSetJoinThread(join, join_function, loop_wait=Amount(1, Time.MILLISECONDS)) |
| ssjt.start() |
| |
| def test_loop(): |
| join.set() |
| joined.wait(timeout=1.0) |
| assert not join.is_set() # join is cleared |
| assert joined.is_set() # joined has been called |
| joined.clear() |
| |
| # validate that the loop is working |
| test_loop() |
| test_loop() |
| |
| ssjt.stop() |
| ssjt.join(timeout=1.0) |
| assert not ssjt.is_alive() |
| assert not joined.is_set() |
| |
| |
| def test_announcer_under_normal_circumstances(): |
| joined = threading.Event() |
| |
| def joined_side_effect(*args, **kw): |
| joined.set() |
| return 'membership foo' |
| |
| mock_serverset = create_autospec(spec=ServerSet, instance=True) |
| mock_serverset.join = MagicMock() |
| mock_serverset.join.side_effect = joined_side_effect |
| mock_serverset.cancel = MagicMock() |
| |
| endpoint = Endpoint('localhost', 12345) |
| clock = ThreadedClock(31337.0) |
| |
| announcer = Announcer(mock_serverset, endpoint, clock=clock) |
| assert announcer.disconnected_time() == 0.0 |
| clock.tick(1.0) |
| assert announcer.disconnected_time() == 1.0, ( |
| 'Announcer should advance disconnection time when not yet initially connected.') |
| |
| announcer.start() |
| |
| try: |
| joined.wait(timeout=1.0) |
| assert joined.is_set() |
| |
| assert announcer.disconnected_time() == 0.0 |
| clock.tick(1.0) |
| assert announcer.disconnected_time() == 0.0, ( |
| 'Announcer should not advance disconnection time when connected.') |
| assert announcer._membership == 'membership foo' |
| |
| finally: |
| announcer.stop() |
| |
| mock_serverset.cancel.assert_called_with('membership foo') |
| |
| assert announcer.disconnected_time() == 0.0 |
| clock.tick(1.0) |
| assert announcer.disconnected_time() == 0.0, ( |
| 'Announcer should not advance disconnection time when stopped.') |
| |
| |
| @pytest.mark.skipif('True', reason='Flaky test (AURORA-639)') |
| def test_announcer_on_expiration(): |
| joined = threading.Event() |
| operations = [] |
| |
| def joined_side_effect(*args, **kw): |
| # 'global' does not work within python nested functions, so we cannot use a |
| # counter here, so instead we do append/len (see PEP-3104) |
| operations.append(1) |
| if len(operations) == 1 or len(operations) == 3: |
| joined.set() |
| return 'membership %d' % len(operations) |
| else: |
| raise KazooException('Failed to reconnect') |
| |
| mock_serverset = create_autospec(spec=ServerSet, instance=True) |
| mock_serverset.join = MagicMock() |
| mock_serverset.join.side_effect = joined_side_effect |
| mock_serverset.cancel = MagicMock() |
| |
| endpoint = Endpoint('localhost', 12345) |
| clock = ThreadedClock(31337.0) |
| |
| announcer = Announcer( |
| mock_serverset, endpoint, clock=clock, exception_wait=Amount(2, Time.SECONDS)) |
| announcer.start() |
| |
| try: |
| joined.wait(timeout=1.0) |
| assert joined.is_set() |
| assert announcer._membership == 'membership 1' |
| assert announcer.disconnected_time() == 0.0 |
| clock.tick(1.0) |
| assert announcer.disconnected_time() == 0.0 |
| announcer.on_expiration() # expect exception |
| clock.tick(1.0) |
| assert announcer.disconnected_time() == 1.0, ( |
| 'Announcer should be disconnected on expiration.') |
| clock.tick(10.0) |
| assert announcer.disconnected_time() == 0.0, ( |
| 'Announcer should not advance disconnection time when connected.') |
| assert announcer._membership == 'membership 3' |
| |
| finally: |
| announcer.stop() |
| |
| |
| @pytest.mark.skipif('True', reason='Flaky test (AURORA-639)') |
| def test_announcer_under_abnormal_circumstances(): |
| mock_serverset = create_autospec(spec=ServerSet, instance=True) |
| mock_serverset.join = MagicMock() |
| mock_serverset.join.side_effect = [ |
| KazooException('Whoops the ensemble is down!'), |
| 'member0001', |
| ] |
| mock_serverset.cancel = MagicMock() |
| |
| endpoint = Endpoint('localhost', 12345) |
| clock = ThreadedClock(31337.0) |
| |
| announcer = Announcer( |
| mock_serverset, endpoint, clock=clock, exception_wait=Amount(2, Time.SECONDS)) |
| announcer.start() |
| |
| try: |
| clock.tick(1.0) |
| assert announcer.disconnected_time() == 1.0 |
| clock.tick(2.0) |
| assert announcer.disconnected_time() == 0.0, ( |
| 'Announcer should recover after an exception thrown internally.') |
| assert announcer._membership == 'member0001' |
| finally: |
| announcer.stop() |
| |
| |
| def make_assigned_task(thermos_config, assigned_ports=None): |
| from gen.apache.aurora.api.constants import AURORA_EXECUTOR_NAME |
| from gen.apache.aurora.api.ttypes import ( |
| AssignedTask, |
| ExecutorConfig, |
| JobKey, |
| TaskConfig |
| ) |
| |
| assigned_ports = assigned_ports or {} |
| executor_config = ExecutorConfig(name=AURORA_EXECUTOR_NAME, data=thermos_config.json_dumps()) |
| task_config = TaskConfig( |
| job=JobKey( |
| role=thermos_config.role().get(), |
| environment="prod", |
| name=thermos_config.name().get()), |
| executorConfig=executor_config) |
| |
| return AssignedTask( |
| instanceId=12345, |
| task=task_config, |
| assignedPorts=assigned_ports, |
| slaveHost='test-host') |
| |
| |
| def make_job(role, environment, name, primary_port, portmap, zk_path=None): |
| from apache.aurora.config.schema.base import ( |
| Announcer, |
| Job, |
| Process, |
| Resources, |
| Task, |
| ) |
| task = Task( |
| name='ignore2', |
| processes=[Process(name='ignore3', cmdline='ignore4')], |
| resources=Resources(cpu=1, ram=1, disk=1)) |
| if zk_path: |
| announce = Announcer(primary_port=primary_port, portmap=portmap, zk_path=zk_path) |
| else: |
| announce = Announcer(primary_port=primary_port, portmap=portmap) |
| job = Job( |
| role=role, |
| environment=environment, |
| name=name, |
| cluster='ignore1', |
| task=task, |
| announce=announce) |
| return job |
| |
| |
| def test_make_empty_endpoints(): |
| hostname = 'aurora.example.com' |
| portmap = {} |
| primary_port = 'http' |
| |
| # test no bound 'http' port |
| primary, additional = make_endpoints(hostname, portmap, primary_port) |
| assert primary == Endpoint(hostname, 0) |
| assert additional == {} |
| |
| |
| @patch('apache.aurora.executor.common.announcer.ServerSet') |
| @patch('apache.aurora.executor.common.announcer.KazooClient') |
| def test_announcer_provider_with_timeout(mock_client_provider, mock_serverset_provider): |
| mock_client = create_autospec(spec=KazooClient, instance=True) |
| mock_client_provider.return_value = mock_client |
| client_connect_event = threading.Event() |
| mock_client.start_async.return_value = client_connect_event |
| |
| mock_serverset = create_autospec(spec=ServerSet, instance=True) |
| mock_serverset_provider.return_value = mock_serverset |
| |
| dap = DefaultAnnouncerCheckerProvider('zookeeper.example.com', root='/aurora') |
| job = make_job('aurora', 'prod', 'proxy', 'primary', portmap={'http': 80, 'admin': 'primary'}) |
| |
| health_check_config = HealthCheckConfig(initial_interval_secs=0.1, interval_secs=0.1) |
| job = job(health_check_config=health_check_config) |
| assigned_task = make_assigned_task(job, assigned_ports={'primary': 12345}) |
| checker = dap.from_assigned_task(assigned_task, None) |
| |
| mock_client.start_async.assert_called_once_with() |
| mock_serverset_provider.assert_called_once_with(mock_client, '/aurora/aurora/prod/proxy') |
| |
| checker.start() |
| checker.start_event.wait() |
| |
| assert checker.status is not None |
| |
| |
| @patch('apache.aurora.executor.common.announcer.ServerSet') |
| @patch('apache.aurora.executor.common.announcer.KazooClient') |
| def test_default_announcer_provider(mock_client_provider, mock_serverset_provider): |
| mock_client = create_autospec(spec=KazooClient, instance=True) |
| mock_client_provider.return_value = mock_client |
| mock_serverset = create_autospec(spec=ServerSet, instance=True) |
| mock_serverset_provider.return_value = mock_serverset |
| |
| dap = DefaultAnnouncerCheckerProvider('zookeeper.example.com', root='/aurora') |
| job = make_job('aurora', 'prod', 'proxy', 'primary', portmap={'http': 80, 'admin': 'primary'}) |
| assigned_task = make_assigned_task(job, assigned_ports={'primary': 12345}) |
| checker = dap.from_assigned_task(assigned_task, None) |
| |
| mock_client.start_async.assert_called_once_with() |
| mock_serverset_provider.assert_called_once_with(mock_client, '/aurora/aurora/prod/proxy') |
| assert checker.name() == 'announcer' |
| assert checker.status is None |
| |
| |
| def test_default_announcer_provider_without_announce(): |
| from pystachio import Empty |
| |
| job = make_job('aurora', 'prod', 'proxy', 'primary', portmap={}) |
| job = job(announce=Empty) |
| assigned_task = make_assigned_task(job) |
| |
| assert DefaultAnnouncerCheckerProvider('foo.bar').from_assigned_task(assigned_task, None) is None |
| |
| |
| @patch('apache.aurora.executor.common.announcer.ServerSet') |
| @patch('apache.aurora.executor.common.announcer.KazooClient') |
| def test_announcer_provider_with_zkpath(mock_client_provider, mock_serverset_provider): |
| mock_client = create_autospec(spec=KazooClient, instance=True) |
| mock_client_provider.return_value = mock_client |
| mock_serverset = create_autospec(spec=ServerSet, instance=True) |
| mock_serverset_provider.return_value = mock_serverset |
| |
| dap = DefaultAnnouncerCheckerProvider('zookeeper.example.com', '', True) |
| job = make_job('aurora', 'prod', 'proxy', 'primary', portmap={'http': 80, 'admin': 'primary'}, |
| zk_path='/uns/v1/sjc1-prod/us1/service/prod') |
| assigned_task = make_assigned_task(job, assigned_ports={'primary': 12345}) |
| checker = dap.from_assigned_task(assigned_task, None) |
| |
| mock_client.start_async.assert_called_once_with() |
| mock_serverset_provider.assert_called_once_with(mock_client, '/uns/v1/sjc1-prod/us1/service/prod') |
| assert checker.name() == 'announcer' |
| assert checker.status is None |
| |
| |
| @patch('apache.aurora.executor.common.announcer.ServerSet') |
| @patch('apache.aurora.executor.common.announcer.KazooClient') |
| @patch('apache.aurora.executor.common.announcer.Endpoint') |
| def test_announcer_provider_with_hostname(endpoint_mock_provider, |
| mock_client_provider, mock_serverset_provider): |
| mock_client = create_autospec(spec=KazooClient, instance=True) |
| mock_client_provider.return_value = mock_client |
| mock_serverset = create_autospec(spec=ServerSet, instance=True) |
| mock_serverset_provider.return_value = mock_serverset |
| |
| dap = DefaultAnnouncerCheckerProvider('zookeeper.example.com', '/aurora', False, '10.2.3.4') |
| job = make_job('aurora', 'prod', 'proxy', 'primary', portmap={}) |
| assigned_task = make_assigned_task(job, assigned_ports={'primary': 12345}) |
| dap.from_assigned_task(assigned_task, None) |
| |
| assert endpoint_mock_provider.mock_calls == [call('10.2.3.4', 12345), call('10.2.3.4', 12345)] |
| |
| |
| def generate_zk_auth_json(): |
| auth = { |
| 'auth': [ |
| { |
| 'scheme': 'digest', |
| 'credential': 'user:pass' |
| } |
| ], |
| 'acl': [ |
| { |
| 'scheme': 'digest', |
| 'credential': 'user:pass', |
| 'permissions': { |
| 'read': True, |
| 'write': True, |
| 'create': True, |
| 'delete': True, |
| 'admin': False |
| } |
| } |
| ] |
| } |
| return json.dumps(auth) |
| |
| |
| def test_make_zk_auth_with_good_config(): |
| with temporary_file() as fp: |
| fp.write(generate_zk_auth_json()) |
| fp.flush() |
| |
| zk_auth = make_zk_auth(fp.name) |
| perms = Permissions(read=True, write=True, delete=True, create=True, admin=False) |
| assert zk_auth.acl()[0] == Access(scheme='digest', |
| credential='user:pass', |
| permissions=perms) |
| assert zk_auth.auth()[0] == Auth(scheme='digest', credential='user:pass') |
| |
| |
| def test_make_zk_auth_with_no_config(): |
| auth = make_zk_auth(None) |
| assert auth is None |
| |
| |
| def test_make_zk_auth_with_bad_config(): |
| with pytest.raises(SystemExit): |
| make_zk_auth('file-not-present') |
| |
| with temporary_file() as fp: |
| fp.write('Bad json') |
| fp.flush() |
| |
| with pytest.raises(SystemExit): |
| make_zk_auth(fp.name) |
| |
| |
| def test_make_zk_auth_config_validation(): |
| invalid_configs = [ |
| # No credential in auth |
| {'auth': [{'scheme': 's'}], |
| 'acl': [{'scheme': 's', 'credential': 'c', 'permissions': {'read': True}}]}, |
| # Acl is not a list |
| {'auth': [{'scheme': 's', 'credential': 'c'}], |
| 'acl': {'scheme': 's', 'credential': 'c', 'permissions': {'read': True}}}, |
| # No credential in acl |
| {'auth': [{'scheme': 's', 'credential': 'c'}], |
| 'acl': [{'scheme': 's', 'permissions': {'read': True}}]}, |
| # permissions is not an object |
| {'auth': [{'scheme': 's', 'credential': 'c'}], |
| 'acl': [{'scheme': 's', 'credential': 'c', 'permissions': 'non-object'}]}, |
| # permissions object has unrecognized property |
| {'auth': [{'scheme': 's', 'credential': 'c'}], |
| 'acl': [{'scheme': 's', 'credential': 'c', 'permissions': {'extraprop': True}}]}, |
| # non boolean property in permissions object |
| {'auth': [{'scheme': 's', 'credential': 'c'}], |
| 'acl': [{'scheme': 's', 'credential': 'c', 'permissions': {'read': 'non-bool'}}]}, |
| ] |
| for invalid_config in invalid_configs: |
| with temporary_file() as fp: |
| fp.write(json.dumps(invalid_config)) |
| fp.flush() |
| |
| with pytest.raises(SystemExit): |
| make_zk_auth(fp.name) |
| |
| |
| @patch('apache.aurora.executor.common.announcer.ServerSet') |
| @patch('apache.aurora.executor.common.announcer.KazooClient') |
| def test_announcer_provider_with_acl(mock_client_provider, mock_serverset_provider): |
| mock_client = create_autospec(spec=KazooClient, instance=True) |
| mock_client_provider.return_value = mock_client |
| mock_serverset = create_autospec(spec=ServerSet, instance=True) |
| mock_serverset_provider.return_value = mock_serverset |
| |
| zk_auth = ZkAuth(auth=[Auth(scheme='s', credential='ca')], |
| acl=[Access(scheme='s', credential='cl', permissions=Permissions(create=True))]) |
| |
| dap = DefaultAnnouncerCheckerProvider('zookeeper.example.com', '', False, None, zk_auth) |
| job = make_job('aurora', 'prod', 'proxy', 'primary', portmap={}) |
| assigned_task = make_assigned_task(job, assigned_ports={'primary': 12345}) |
| dap.from_assigned_task(assigned_task, None) |
| |
| mock_client_provider.assert_called_once_with('zookeeper.example.com', |
| connection_retry=dap.DEFAULT_RETRY_POLICY, |
| auth_data=[('s', 'ca')], |
| default_acl=[ACL(KazooPermissions.CREATE, |
| Id('s', 'cl'))]) |