blob: 53d5fa9a22da58e2aaa7243370f48e6af9e154e1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import util
import logging
import zopkio.runtime as runtime
from kafka import SimpleProducer, SimpleConsumer
logger = logging.getLogger(__name__)
DEPLOYER = 'samza_job_deployer'
JOB_ID = 'negate_number'
PACKAGE_ID = 'tests'
CONFIG_FILE = 'config/negate-number.properties'
TEST_INPUT_TOPIC = 'samza-test-topic'
TEST_OUTPUT_TOPIC = 'samza-test-topic-output'
NUM_MESSAGES = 50
def test_samza_job():
"""
Runs a job that reads converts input strings to integers, negates the
integer, and outputs to a Kafka topic.
"""
_load_data()
util.start_job(PACKAGE_ID, JOB_ID, CONFIG_FILE)
util.await_job(PACKAGE_ID, JOB_ID)
def validate_samza_job():
"""
Validates that negate-number negated all messages, and sent the output to
samza-test-topic-output.
"""
logger.info('Running validate_samza_job')
kafka = util.get_kafka_client()
kafka.ensure_topic_exists(TEST_OUTPUT_TOPIC)
consumer = SimpleConsumer(kafka, 'samza-test-group', TEST_OUTPUT_TOPIC)
messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=300)
message_count = len(messages)
assert NUM_MESSAGES == message_count, 'Expected {0} lines, but found {1}'.format(NUM_MESSAGES, message_count)
for message in map(lambda m: m.message.value, messages):
assert int(message) < 0 , 'Expected negative integer but received {0}'.format(message)
kafka.close()
def _load_data():
"""
Sends 50 messages (1 .. 50) to samza-test-topic.
"""
logger.info('Running test_samza_job')
kafka = util.get_kafka_client()
kafka.ensure_topic_exists(TEST_INPUT_TOPIC)
producer = SimpleProducer(
kafka,
async=False,
req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
ack_timeout=30000)
for i in range(1, NUM_MESSAGES + 1):
producer.send_messages(TEST_INPUT_TOPIC, str(i))
kafka.close()