blob: 83e0e5a08ce58720e85cbab112b111be05d41eba [file] [log] [blame]
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
if [ -z $1 ] || [ -z $2 ]; then
echo "Usage: ./test_resume_savepoint.sh <original_dop> <new_dop>"
exit 1
fi
source "$(dirname "$0")"/common.sh
# get Kafka 0.10.0
mkdir -p $TEST_DATA_DIR
if [ -z "$3" ]; then
# need to download Kafka because no Kafka was specified on the invocation
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"
echo "Downloading Kafka from $KAFKA_URL"
curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
else
echo "Using specified Kafka from $3"
cp $3 $TEST_DATA_DIR/kafka.tgz
fi
tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
# fix kafka config
sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties
sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties
$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties
$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties
ORIGINAL_DOP=$1
NEW_DOP=$2
if (( $ORIGINAL_DOP >= $NEW_DOP )); then
NUM_SLOTS=$(( $ORIGINAL_DOP + 1 ))
else
NUM_SLOTS=$(( $NEW_DOP + 1 ))
fi
# modify configuration to have enough slots
cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $NUM_SLOTS/" $FLINK_DIR/conf/flink-conf.yaml
# modify configuration to use SLF4J reporter; we will be using this to monitor the state machine progress
cp $FLINK_DIR/opt/flink-metrics-slf4j-*.jar $FLINK_DIR/lib/
echo "metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter" >> $FLINK_DIR/conf/flink-conf.yaml
echo "metrics.reporter.slf4j.interval: 1 SECONDS" >> $FLINK_DIR/conf/flink-conf.yaml
start_cluster
# make sure to stop Kafka and ZooKeeper at the end, as well as cleaning up the Flink cluster and our moodifications
function test_cleanup {
$KAFKA_DIR/bin/kafka-server-stop.sh
$KAFKA_DIR/bin/zookeeper-server-stop.sh
# revert our modifications to the Flink distribution
rm $FLINK_DIR/conf/flink-conf.yaml
mv $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
rm $FLINK_DIR/lib/flink-metrics-slf4j-*.jar
# make sure to run regular cleanup as well
cleanup
}
trap test_cleanup INT
trap test_cleanup EXIT
# zookeeper outputs the "Node does not exist" bit to stderr
while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
echo "Waiting for broker..."
sleep 1
done
# create the required topic
$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input
# run the state machine example job
STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $FLINK_DIR/examples/streaming/StateMachineExample.jar \
--kafka-topic test-input \
| grep "Job has been submitted with JobID" | sed 's/.* //g')
wait_job_running $STATE_MACHINE_JOB
# then, run the events generator
EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \
--kafka-topic test-input --sleep 15 \
| grep "Job has been submitted with JobID" | sed 's/.* //g')
wait_job_running $EVENTS_GEN_JOB
function get_metric_state_machine_processed_records {
grep ".State machine job.Flat Map -> Sink: Print to Std. Out.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1
}
function get_num_metric_samples {
grep ".State machine job.Flat Map -> Sink: Print to Std. Out.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l
}
# monitor the numRecordsIn metric of the state machine operator;
# only proceed to savepoint when the operator has processed 200 records
while : ; do
NUM_RECORDS=$(get_metric_state_machine_processed_records)
if [ -z $NUM_RECORDS ]; then
NUM_RECORDS=0
fi
if (( $NUM_RECORDS < 200 )); then
echo "Waiting for state machine job to process up to 200 records, current progress: $NUM_RECORDS records ..."
sleep 1
else
break
fi
done
# take a savepoint of the state machine job
SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
| grep "Savepoint completed. Path:" | sed 's/.* //g')
cancel_job $STATE_MACHINE_JOB
# Since it is not possible to differentiate reporter output between the first and second execution,
# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
OLD_NUM_METRICS=$(get_num_metric_samples)
# resume state machine job with savepoint
STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \
--kafka-topic test-input \
| grep "Job has been submitted with JobID" | sed 's/.* //g')
wait_job_running $STATE_MACHINE_JOB
# monitor the numRecordsIn metric of the state machine operator in the second execution
# we let the test finish once the second restore execution has processed 200 records
while : ; do
NUM_METRICS=$(get_num_metric_samples)
NUM_RECORDS=$(get_metric_state_machine_processed_records)
# only account for metrics that appeared in the second execution
if (( $OLD_NUM_METRICS >= $NUM_METRICS )) ; then
NUM_RECORDS=0
fi
if (( $NUM_RECORDS < 200 )); then
echo "Waiting for state machine job to process up to 200 records, current progress: $NUM_RECORDS records ..."
sleep 1
else
break
fi
done
# if state is errorneous and the state machine job produces alerting state transitions,
# output would be non-empty and the test will not pass