commit | f7afdfff97cd723b76b1ada4dd1f029a1ce44702 | [log] [tgz] |
---|---|---|
author | Alan Zhang <shuai.xyz@gmail.com> | Wed Jun 29 16:47:49 2022 -0700 |
committer | GitHub <noreply@github.com> | Wed Jun 29 16:47:49 2022 -0700 |
tree | 08db728443ec6355e59ce5b00a9a8ac17fe0f8ac | |
parent | 493de64bb8c0228bbda11822ae3c84c2085263bf [diff] |
SAMZA-2749: Startpoint bug fix (#1615) Symptom: Using startpoints to trigger full bootstrapping is not reliable in the current implementation, we observed that the bootstrapping only happened on the part of expected partitions. Cause: Within Samza (the main class to pay attention to is OffsetManager.scala), there is a bug in which a startpoint can be deleted before the startpoint actually gets used for message consumption. If a container gets into this situation, then the result is that the startpoint is ignored and consumption will continue from the previous processed message from before the startpoint was applied. Load last processed offsets and startpoints Use startpoints to register starting offsets for consumers Message processing starts, but messages for only some of the partitions are received Write checkpoint using last processed offsets If a partition did not get messages, then the last processed offset is still the offset from before the standpoint. Delete startpoints Container dies (e.g. due to running out of memory) On restart, load last processed offsets (startpoints have been deleted) The partitions that did have messages in the previous deployment will have the correct checkpoint. The partitions that did not have messages will have the checkpoint set to the offset from before the startpoint was applied. This is unexpected, and it means that bootstrapping is not happening for this partition. Changes: Keep track of the partitions which have updated processed offsets, and only delete the startpoint for those partitions after checkpointing. API Changes: Added a new API removeFanOutForTaskSSPs in StartpointManager to allow clean up the fan outs on partition granularity
Apache Samza is a distributed stream processing framework. It uses Apache Kafka for messaging, and Apache Hadoop YARN to provide fault tolerance, processor isolation, security, and resource management.
Samza's key features include:
Check out Hello Samza to try Samza. Read the Background page to learn more about Samza.
To build Samza from a git checkout, run:
./gradlew clean build
To build Samza from a source release, it is first necessary to download the gradle wrapper script above. This bootstrapping process requires Gradle to be installed on the source machine. Gradle is available through most package managers or directly from its website. To bootstrap the wrapper, run:
gradle -b bootstrap.gradle
After the bootstrap script has completed, the regular gradlew instructions below are available.
Samza builds with Scala 2.11 or 2.12 and YARN 2.6.1, by default. Use the -PscalaSuffix switches to change Scala versions. Samza supports building Scala with 2.11 and 2.12.
./gradlew -PscalaSuffix=2.11 clean build
To run all tests:
./gradlew clean test
To run a single test:
./gradlew clean :samza-test:test -Dtest.single=TestStatefulTask
To run key-value performance tests:
./gradlew samza-shell:kvPerformanceTest -PconfigPath=file://$PWD/samza-test/src/main/config/perf/kv-perf.properties
To run yarn integration tests:
./bin/integration-tests.sh <dir> yarn-integration-tests
To run standalone integration tests:
./bin/integration-tests.sh <dir> standalone-integration-tests
./gradlew checkstyleMain checkstyleTest
To run a job (defined in a properties file):
./gradlew samza-shell:runJob -PconfigPath=/path/to/job/config.properties
To inspect a job's latest checkpoint:
./gradlew samza-shell:checkpointTool -PconfigPath=/path/to/job/config.properties
To modify a job's checkpoint (assumes that the job is not currently running), give it a file with the new offset for each partition, in the format systems.<system>.streams.<topic>.partitions.<partition>=<offset>
:
./gradlew samza-shell:checkpointTool -PconfigPath=/path/to/job/config.properties \ -PnewOffsets=file:///path/to/new/offsets.properties
To get Eclipse projects, run:
./gradlew eclipse
For IntelliJ, run:
./gradlew idea
To start contributing on Samza please read Rules and Contributor Corner. Notice that Samza git repository does not support git pull request.
Apache Samza is a top level project of the Apache Software Foundation.