blob: ee8f8a9a8cb760273e687e9d3756c65212af0bd5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import util
import sys
import logging
from kafka import SimpleProducer, SimpleConsumer
import time
import traceback
from stream_processor import StreamProcessor
from zk_client import ZkClient
import threading
logger = logging.getLogger(__name__)
NUM_MESSAGES = 50
GROUP_COORDINATION_TIMEOUT = 14
TEST_OUTPUT_TOPIC = 'standalone_integration_test_kafka_output_topic'
zk_client = None
### TODO: In each test add barrier state and processorId validations after fixing data serialization format in zookeeper(SAMZA-1749).
def __purge_zk_data():
"""
Recursively deletes all data nodes created in zookeeper in a test-run.
"""
zk_client.purge_all_nodes()
def __pump_messages_into_input_topic():
"""
Produce 50 messages into input topic: standalone_integration_test_kafka_input_topic.
"""
kafka_client = None
input_topic = 'standalone_integration_test_kafka_input_topic'
try:
kafka_client = util.get_kafka_client()
kafka_client.ensure_topic_exists(input_topic)
producer = SimpleProducer(kafka_client, async=False, req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT, ack_timeout=30000)
logger.info('Producing {0} messages to topic: {1}'.format(NUM_MESSAGES, input_topic))
for message_index in range(1, NUM_MESSAGES + 1):
producer.send_messages(input_topic, str(message_index))
except:
logger.error(traceback.format_exc(sys.exc_info()))
finally:
if kafka_client is not None:
kafka_client.close()
def __setup_processors():
"""
Instantiates and schedules three stream processors for execution in localhost.
:return the instantiated stream processors.
"""
processors = {}
for processor_id in ['standalone-processor-1', 'standalone-processor-2', 'standalone-processor-3']:
processors[processor_id] = StreamProcessor(host_name='localhost', processor_id=processor_id)
processors[processor_id].start()
return processors
def __tear_down_processors(processors):
"""
Kills all the stream processor passed in :param processors.
"""
for processor_id, processor in processors.iteritems():
logger.info("Killing processor: {0}.".format(processor_id))
processor.kill()
def __setup_zk_client():
"""
Instantiate a ZkClient to connect to a zookeeper server in localhost.
"""
global zk_client
zk_client = ZkClient(zookeeper_host='127.0.0.1', zookeeper_port='2181', app_name='test-app-name', app_id='test-app-id')
zk_client.start()
def __teardown_zk_client():
"""
Stops the ZkClient.
"""
global zk_client
zk_client.stop()
def job_model_watcher(event, expected_processors):
start_time_seconds = time.time()
elapsed_time_seconds = (int)(time.time() - start_time_seconds)
while elapsed_time_seconds <= 30:
recent_job_model = zk_client.get_latest_job_model()
if set(recent_job_model['containers'].keys()) == set(expected_processors):
event.set()
return
else:
time.sleep(2)
elapsed_time_seconds = (int)(time.time() - start_time_seconds)
def __validate_job_model(job_model, killed_processors=[]):
## Validate the TaskModel. Check if all the partitions are assigned to the containers.
expected_ssps = [{u'keyBucket': -1, u'partition': 0, u'system': u'testSystemName', u'stream': u'standalone_integration_test_kafka_input_topic'},
{u'keyBucket': -1, u'partition': 1, u'system': u'testSystemName', u'stream': u'standalone_integration_test_kafka_input_topic'},
{u'keyBucket': -1, u'partition': 2, u'system': u'testSystemName', u'stream': u'standalone_integration_test_kafka_input_topic'}]
actual_ssps = []
for container_id, tasks in job_model['containers'].iteritems():
for partition, ssps in tasks['tasks'].iteritems():
actual_ssps.append(ssps['system-stream-partitions'][0])
actual_ssps.sort()
assert expected_ssps == actual_ssps, 'Expected ssp: {0}, Actual ssp: {1}.'.format(expected_ssps, actual_ssps)
## Validate the ContainerModel. Live processors should be present in the JobModel and killed processors should not be in JobModel.
active_processors = zk_client.get_active_processors()
assert set(active_processors) == set(job_model['containers'].keys()), 'ProcessorIds: {0} does not exist in JobModel: {1}.'.format(active_processors, job_model['containers'].keys())
for processor_id in killed_processors:
assert processor_id not in job_model['containers'], 'Processor: {0} exists in JobModel: {1}.'.format(processor_id, job_model)
def __get_job_model(expected_processors):
event = threading.Event()
zk_client.watch_job_model(job_model_watcher(event=event, expected_processors=expected_processors))
event.wait(2 * GROUP_COORDINATION_TIMEOUT)
return zk_client.get_latest_job_model()
def test_kill_leader():
"""
Launches three stream processors. Kills the leader processor. Waits till the group coordination timeout
and verifies that the final JobModel contains both the followers.
"""
processors = {}
try:
__setup_zk_client()
__pump_messages_into_input_topic()
processors = __setup_processors()
## Validations before killing the leader.
job_model = __get_job_model(expected_processors=processors.keys())
__validate_job_model(job_model, [])
leader_processor_id = zk_client.get_leader_processor_id()
processors.pop(leader_processor_id).kill()
## Validations after killing the leader.
job_model = __get_job_model(expected_processors=processors.keys())
assert leader_processor_id != zk_client.get_leader_processor_id(), '{0} is still the leader'.format(leader_processor_id)
__validate_job_model(job_model, [leader_processor_id])
except:
## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
logger.error(traceback.format_exc(sys.exc_info()))
raise
finally:
__tear_down_processors(processors)
__purge_zk_data()
__teardown_zk_client()
def test_kill_one_follower():
"""
Launches three stream processors. Kills one follower processor. Waits till the group coordination timeout and
verifies that the final JobModel contains the leader processor and un-killed follower processor.
"""
processors = {}
try:
__setup_zk_client()
__pump_messages_into_input_topic()
processors = __setup_processors()
leader_processor_id, killed_processors = zk_client.get_leader_processor_id(), []
## Validations before killing the follower.
job_model = __get_job_model(expected_processors=processors.keys())
__validate_job_model(job_model)
for processor_id, deployer in processors.iteritems():
if processor_id != leader_processor_id:
follower = processors.pop(processor_id)
follower.kill()
killed_processors.append(follower)
break
## Validations after killing the follower.
job_model = __get_job_model(expected_processors=processors.keys())
assert leader_processor_id == zk_client.get_leader_processor_id(), '{0} is not the leader'.format(leader_processor_id)
__validate_job_model(job_model, killed_processors)
except:
## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
logger.error(traceback.format_exc(sys.exc_info()))
raise
finally:
__tear_down_processors(processors)
__purge_zk_data()
__teardown_zk_client()
def test_kill_multiple_followers():
"""
Launches three stream processors. Kills both the follower processors. Waits for group coordination timeout
and verifies that the final JobModel contains only the leader processor.
"""
processors = {}
try:
__setup_zk_client()
__pump_messages_into_input_topic()
processors = __setup_processors()
## Validations before killing the followers.
job_model = __get_job_model(expected_processors=processors.keys())
__validate_job_model(job_model)
leader_processor_id, killed_processors = zk_client.get_leader_processor_id(), []
for processor_id in processors.keys():
if processor_id != leader_processor_id:
follower = processors.pop(processor_id)
killed_processors.append(follower)
follower.kill()
## Validations after killing the followers.
job_model = __get_job_model(expected_processors=processors.keys())
__validate_job_model(job_model, killed_processors)
except:
## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
logger.error(traceback.format_exc(sys.exc_info()))
raise
finally:
__tear_down_processors(processors)
__purge_zk_data()
__teardown_zk_client()
def test_kill_leader_and_a_follower():
"""
Launches three stream processors. Kills both a leader and a follower processors.
Waits till the group coordination timeout and verifies that the final JobModel contains only one processor.
"""
processors = {}
try:
__setup_zk_client()
__pump_messages_into_input_topic()
processors = __setup_processors()
leader_processor_id = zk_client.get_leader_processor_id()
## Validations before killing the leader and follower.
job_model = __get_job_model(expected_processors=processors.keys())
__validate_job_model(job_model)
killed_processors = [leader_processor_id]
processors.pop(leader_processor_id).kill()
for processor_id in processors.keys():
follower = processors.pop(processor_id)
killed_processors.append(processor_id)
follower.kill()
break
## Validations after killing the leader and follower.
job_model = __get_job_model(expected_processors=processors.keys())
__validate_job_model(job_model, killed_processors)
except:
## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
logger.error(traceback.format_exc(sys.exc_info()))
raise
finally:
__tear_down_processors(processors)
__purge_zk_data()
__teardown_zk_client()
def test_pause_resume_leader():
"""
Launches three processors. Pauses the leader processor. Wait till group coordination timeout and verifies that the
JobModel doesn't contain leader processor. Resumes the leader processor and waits till group coordination timeout,
verifies that new JobModel contains the previously paused leader processor.
"""
processors = {}
try:
__setup_zk_client()
__pump_messages_into_input_topic()
processors = __setup_processors()
## Validations before pausing the leader.
job_model = __get_job_model(expected_processors=processors.keys())
__validate_job_model(job_model)
leader_processor_id = zk_client.get_leader_processor_id()
leader = processors.pop(leader_processor_id)
logger.info("Pausing the leader processor: {0}.".format(leader_processor_id))
leader.pause()
## Validations after pausing the leader.
job_model = __get_job_model(expected_processors=processors.keys())
__validate_job_model(job_model, [leader_processor_id])
logger.info("Resuming the leader processor: {0}.".format(leader_processor_id))
leader.resume()
## Validations after resuming the leader.
job_model = __get_job_model(expected_processors=processors.keys())
__validate_job_model(job_model)
leader.kill()
except:
## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
logger.error(traceback.format_exc(sys.exc_info()))
raise
finally:
__tear_down_processors(processors)
__purge_zk_data()
__teardown_zk_client()