blob: e6c873a373ddc8c6ff0cc8886d8a5cf1ba11f3ec [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import util
import logging
import zopkio.runtime as runtime
from kafka import SimpleProducer, SimpleConsumer
logger = logging.getLogger(__name__)
PACKAGE_ID = 'tests'
KAFKA_JOB_ID = 'kafka-read-write-performance'
KAFKA_CONFIG_FILE = 'config/perf/kafka-read-write-performance.properties'
CONTAINER_JOB_ID = 'container-performance'
CONTAINER_CONFIG_FILE = 'config/perf/container-performance.properties'
TEST_INPUT_TOPIC = 'kafka-read-write-performance-input'
TEST_OUTPUT_TOPIC = 'kafka-read-write-performance-output'
NUM_MESSAGES = 1000000
MESSAGE = 'a' * 200
def test_kafka_read_write_performance():
"""
Runs a Samza job that reads from Kafka, and writes back out to it. The
writes/sec for the job is logged to the job's container.
"""
_load_data()
util.start_job(PACKAGE_ID, KAFKA_JOB_ID, KAFKA_CONFIG_FILE)
util.await_job(PACKAGE_ID, KAFKA_JOB_ID)
def validate_kafka_read_write_performance():
"""
Validates that all messages were sent to the output topic.
"""
logger.info('Running validate_kafka_read_write_performance')
kafka = util.get_kafka_client()
kafka.ensure_topic_exists(TEST_OUTPUT_TOPIC)
consumer = SimpleConsumer(
kafka,
'samza-test-group',
TEST_OUTPUT_TOPIC,
fetch_size_bytes=1000000,
buffer_size=32768,
max_buffer_size=None)
# wait 5 minutes to get all million messages
messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=300)
message_count = len(messages)
assert NUM_MESSAGES == message_count, 'Expected {0} lines, but found {1}'.format(NUM_MESSAGES, message_count)
kafka.close()
def test_container_performance():
"""
Runs TestPerformanceTask with a MockSystem to test how fast the
SamzaContainer can go.
"""
util.start_job(PACKAGE_ID, CONTAINER_JOB_ID, CONTAINER_CONFIG_FILE)
util.await_job(PACKAGE_ID, CONTAINER_JOB_ID)
def validate_container_performance():
pass
def _load_data():
"""
Sends 10 million messages to kafka-read-write-performance-input.
"""
logger.info('Running test_kafka_read_write_performance')
kafka = util.get_kafka_client()
kafka.ensure_topic_exists(TEST_INPUT_TOPIC)
producer = SimpleProducer(
kafka,
req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
ack_timeout=30000,
batch_send=True,
batch_send_every_n=200)
logger.info('Loading {0} test messages.'.format(NUM_MESSAGES))
for i in range(0, NUM_MESSAGES):
if i % 100000 == 0:
logger.info('Loaded {0} messages.'.format(i))
producer.send_messages(TEST_INPUT_TOPIC, MESSAGE)
kafka.close()