blob: bee93953cfef5625c5b3677b43cfe9e5381ecabf [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import pytest
from pyignite import Client, AioClient
from pyignite.monitoring import ConnectionEventListener, ConnectionLostEvent, ConnectionClosedEvent, \
HandshakeSuccessEvent, HandshakeFailedEvent, HandshakeStartEvent
from tests.util import start_ignite_gen, kill_process_tree
@pytest.fixture(autouse=True)
def server1():
yield from start_ignite_gen(idx=1)
@pytest.fixture(autouse=True)
def server2():
yield from start_ignite_gen(idx=2)
events = []
def teardown_function():
events.clear()
class RecordingConnectionEventListener(ConnectionEventListener):
def on_handshake_start(self, event):
events.append(event)
def on_handshake_success(self, event):
events.append(event)
def on_handshake_fail(self, event):
events.append(event)
def on_authentication_fail(self, event):
events.append(event)
def on_connection_closed(self, event):
events.append(event)
def on_connection_lost(self, event):
events.append(event)
def test_events(request, server2):
client = Client(event_listeners=[RecordingConnectionEventListener()])
with client.connect([('127.0.0.1', 10800 + idx) for idx in range(1, 3)]):
protocol_context = client.protocol_context
nodes = {conn.port: conn for conn in client._nodes}
cache = client.get_or_create_cache(request.node.name)
kill_process_tree(server2.pid)
while True:
try:
cache.put(random.randint(0, 1000), 1)
except: # noqa 13
pass
if any(isinstance(e, ConnectionLostEvent) for e in events):
break
__assert_events(nodes, protocol_context)
@pytest.mark.asyncio
async def test_events_async(request, server2):
client = AioClient(event_listeners=[RecordingConnectionEventListener()])
async with client.connect([('127.0.0.1', 10800 + idx) for idx in range(1, 3)]):
protocol_context = client.protocol_context
nodes = {conn.port: conn for conn in client._nodes}
cache = await client.get_or_create_cache(request.node.name)
kill_process_tree(server2.pid)
while True:
try:
await cache.put(random.randint(0, 1000), 1)
except: # noqa 13
pass
if any(isinstance(e, ConnectionLostEvent) for e in events):
break
__assert_events(nodes, protocol_context)
def __assert_events(nodes, protocol_context):
assert len([e for e in events if isinstance(e, ConnectionLostEvent)]) == 1
# ConnectionLostEvent is a subclass of ConnectionClosedEvent
assert len([e for e in events if type(e) == ConnectionClosedEvent]) == 1
assert len([e for e in events if isinstance(e, HandshakeSuccessEvent)]) == 2
for ev in events:
assert ev.host == '127.0.0.1'
if isinstance(ev, ConnectionLostEvent):
assert ev.port == 10802
assert ev.node_uuid == str(nodes[ev.port].uuid)
assert ev.error_msg
elif isinstance(ev, HandshakeStartEvent):
assert ev.protocol_context == protocol_context
assert ev.port in {10801, 10802}
elif isinstance(ev, HandshakeFailedEvent):
assert ev.port == 10802
assert ev.protocol_context == protocol_context
assert ev.error_msg
elif isinstance(ev, HandshakeSuccessEvent):
assert ev.port in {10801, 10802}
assert ev.node_uuid == str(nodes[ev.port].uuid)
assert ev.protocol_context == protocol_context
elif isinstance(ev, ConnectionClosedEvent):
assert ev.port == 10801
assert ev.node_uuid == str(nodes[ev.port].uuid)