blob: 1bb220e1726bc6e1be532c5541d6373e13fbc1b2 [file] [log] [blame]
INTEGRATION TEST
* What does the test do? *
This is a correctness test that attempts to do partitioned messaging and use state.
It is meant to be run while killing samza and kafka machines to test fault-tolerance.
It runs in iterations and each iteration has a correctness criteria that is checked before launching the next iteration.
Here are the jobs and their function:
emitter.samza:
This job takes input from the "epoch" topic. Epochs are number 0, 1, 2,...
For each new epoch each emitter task does something like the following:
for i = 0...count:
send("emitted", i, partition)
where partition is the task partition id.
joiner.samza:
This job takes in the emitted values from emitter and joins them together by key.
When it has received an emitted value from each partition it outputs the key to the topic "completed".
To track which partitions have emitted their value it keeps a store with | separated numbers.
The first entry is the epoch and the remaining entries are partitions that have emitted the key.
checker.samza:
This job has a single partition and stores all the completed keys.
When all the keys are completed it sends an incremented epoch to the epoch topic, kicking off a new round.
watcher.samza:
This job watches the epoch topic. If the epoch doesn't advance within some SLA this job sends an alert email.
The state maintained by some of these jobs is slightly complex because of the need to make everything idempotent.
So, for example, instead of keeping the partition count in the joiner job we keep the set of partitions
so that double counting can not occur.
* How to setup this test? *
You need to have Paramiko installed in order to run these tests. (pip install paramiko)
This test is meant to be used with hello-samza's bin/grid script (https://github.com/apache/samza-hello-samza).
First, set up a few environment variables:
> export SAMZA_SRC=/path/to/samza
> export HELLO_SAMZA_SRC=/path/to/samza-hello-samza
> export DEPLOY_DIR=$HELLO_SAMZA_SRC/deploy
Deploy Zookeeper, YARN and Kafka:
> cd $HELLO_SAMZA_SRC
> for i in zookeeper kafka yarn; do ./bin/grid install $i; ./bin/grid start $i; done
Update the "yarn.package.path" to $DEPLOY_DIR/samza-test_2.11-1.6.0-SNAPSHOT.tgz
> cd $SAMZA_SRC
> vi samza-test/src/main/config/join/common.properties
yarn.package.path=file:///path/to/samza-hello-samza/deploy/samza-test_2.11-1.6.0-SNAPSHOT.tgz
Then release and extract the test tarball:
> cd $SAMZA_SRC
> ./gradlew releaseTestJobs
> cp samza-test/build/distributions/samza-test_2.11-1.6.0-SNAPSHOT.tgz $DEPLOY_DIR
> mkdir $DEPLOY_DIR/samza
> tar -xvf $DEPLOY_DIR/samza-test_2.11-1.6.0-SNAPSHOT.tgz -C $DEPLOY_DIR/samza
Finally, create the kafka topics and start the samza jobs:
> ./bin/setup-int-test.sh $DEPLOY_DIR
You should now be able to view all 4 RUNNING jobs in the Yarn UI at http://localhost:8088/cluster
FAILURE TEST
* What does the test do? *
This test is used to test the resilience of the system.
It periodically brings down a random container or kafka broker in the system and waits to see if it recovers correctly.
* How to setup this test? *
Verify that the 4 jobs are running via the YARN UI.
Create a file listing the hosts that kafka and yarn containers are running on. For local deployments:
> echo "localhost" > /tmp/nodes.txt
Then run the python script to start the test:
> python $SAMZA_SRC/samza-test/src/main/python/samza_failure_testing.py \
--node-list=/tmp/nodes.txt \
--kill-time=60 \
--kafka-dir=$DEPLOY_DIR/kafka \
--kafka-host=localhost \
--yarn-dir=$DEPLOY_DIR/yarn \
--yarn-host=localhost \
--kill-kafka \
--kill-container