| INTEGRATION TEST |
| |
| * What does the test do? * |
| This is a correctness test that attempts to do partitioned messaging and use state. |
| It is meant to be run while killing samza and kafka machines to test fault-tolerance. |
| It runs in iterations and each iteration has a correctness criteria that is checked before launching the next iteration. |
| |
| Here are the jobs and their function: |
| emitter.samza: |
| This job takes input from the "epoch" topic. Epochs are number 0, 1, 2,... |
| For each new epoch each emitter task does something like the following: |
| for i = 0...count: |
| send("emitted", i, partition) |
| where partition is the task partition id. |
| |
| joiner.samza: |
| This job takes in the emitted values from emitter and joins them together by key. |
| When it has received an emitted value from each partition it outputs the key to the topic "completed". |
| To track which partitions have emitted their value it keeps a store with | separated numbers. |
| The first entry is the epoch and the remaining entries are partitions that have emitted the key. |
| |
| checker.samza: |
| This job has a single partition and stores all the completed keys. |
| When all the keys are completed it sends an incremented epoch to the epoch topic, kicking off a new round. |
| |
| watcher.samza: |
| This job watches the epoch topic. If the epoch doesn't advance within some SLA this job sends an alert email. |
| |
| The state maintained by some of these jobs is slightly complex because of the need to make everything idempotent. |
| So, for example, instead of keeping the partition count in the joiner job we keep the set of partitions |
| so that double counting can not occur. |
| |
| * How to setup this test? * |
| |
| You need to have Paramiko installed in order to run these tests. (pip install paramiko) |
| This test is meant to be used with hello-samza's bin/grid script (https://github.com/apache/samza-hello-samza). |
| |
| First, set up a few environment variables: |
| > export SAMZA_SRC=/path/to/samza |
| > export HELLO_SAMZA_SRC=/path/to/samza-hello-samza |
| > export DEPLOY_DIR=$HELLO_SAMZA_SRC/deploy |
| |
| Deploy Zookeeper, YARN and Kafka: |
| > cd $HELLO_SAMZA_SRC |
| > for i in zookeeper kafka yarn; do ./bin/grid install $i; ./bin/grid start $i; done |
| |
| Update the "yarn.package.path" to $DEPLOY_DIR/samza-test_2.11-1.5.0-SNAPSHOT.tgz |
| > cd $SAMZA_SRC |
| > vi samza-test/src/main/config/join/common.properties |
| yarn.package.path=file:///path/to/samza-hello-samza/deploy/samza-test_2.11-1.5.0-SNAPSHOT.tgz |
| |
| Then release and extract the test tarball: |
| > cd $SAMZA_SRC |
| > ./gradlew releaseTestJobs |
| > cp samza-test/build/distributions/samza-test_2.11-1.5.0-SNAPSHOT.tgz $DEPLOY_DIR |
| > mkdir $DEPLOY_DIR/samza |
| > tar -xvf $DEPLOY_DIR/samza-test_2.11-1.5.0-SNAPSHOT.tgz -C $DEPLOY_DIR/samza |
| |
| Finally, create the kafka topics and start the samza jobs: |
| > ./bin/setup-int-test.sh $DEPLOY_DIR |
| |
| You should now be able to view all 4 RUNNING jobs in the Yarn UI at http://localhost:8088/cluster |
| |
| FAILURE TEST |
| |
| * What does the test do? * |
| This test is used to test the resilience of the system. |
| It periodically brings down a random container or kafka broker in the system and waits to see if it recovers correctly. |
| |
| * How to setup this test? * |
| Verify that the 4 jobs are running via the YARN UI. |
| |
| Create a file listing the hosts that kafka and yarn containers are running on. For local deployments: |
| > echo "localhost" > /tmp/nodes.txt |
| |
| Then run the python script to start the test: |
| > python $SAMZA_SRC/samza-test/src/main/python/samza_failure_testing.py \ |
| --node-list=/tmp/nodes.txt \ |
| --kill-time=60 \ |
| --kafka-dir=$DEPLOY_DIR/kafka \ |
| --kafka-host=localhost \ |
| --yarn-dir=$DEPLOY_DIR/yarn \ |
| --yarn-host=localhost \ |
| --kill-kafka \ |
| --kill-container |