This is a suite of pipelines inspired by the ‘continuous data stream’ queries in [http://datalab.cs.pdx.edu/niagaraST/NEXMark/] (http://datalab.cs.pdx.edu/niagaraST/NEXMark/).
These are multiple queries over a three entities model representing on online auction system:
The queries exercise many aspects of Beam model:
We have augmented the original queries with five more:
We can specify the Beam runner to use with maven profiles, available profiles are:
The runner must also be specified like in any other Beam pipeline using
--runner
Test data is deterministically synthesized on demand. The test data may be synthesized in the same pipeline as the query itself, or may be published to Pubsub.
The query results may be:
Decide if batch or streaming:
--streaming=true
Number of events generators
--numEventGenerators=4
Run query N
--query=N
The suite to run can be chosen using this configuration parameter:
--suite=SUITE
Available suites are:
--manageResources=false --monitorJobs=false
--manageResources=false --monitorJobs=true \ --enforceEncodability=false --enforceImmutability=false --project=<your project> \ --zone=<your zone> \ --workerMachineType=n1-highmem-8 \ --stagingLocation=<a gs path for staging> \ --runner=DataflowRunner \ --tempLocation=gs://talend-imejia/nexmark/temp/ \ --stagingLocation=gs://talend-imejia/nexmark/temp/staging/ \ --filesToStage=target/beam-sdks-java-nexmark-2.1.0-SNAPSHOT.jar
--manageResources=false --monitorJobs=true \ --enforceEncodability=false --enforceImmutability=false
--manageResources=false --monitorJobs=true \ --flinkMaster=local --parallelism=#numcores
--manageResources=false --monitorJobs=true \ --sparkMaster=local \ -Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
Open issues are tracked here:
Query | Direct | Spark | Flink | Apex |
---|---|---|---|---|
0 | ok | ok | ok | ok |
1 | ok | ok | ok | ok |
2 | ok | ok | ok | ok |
3 | ok | BEAM-1035 | ok | BEAM-1037 |
4 | ok | ok | ok | ok |
5 | ok | ok | ok | ok |
6 | ok | ok | ok | ok |
7 | ok | ok | ok | ok |
8 | ok | ok | ok | ok |
9 | ok | ok | ok | ok |
10 | ok | ok | ok | ok |
11 | ok | ok | ok | ok |
12 | ok | ok | ok | ok |
Query | Direct | Spark | Flink | Apex |
---|---|---|---|---|
0 | ok | ok | ok | ok |
1 | ok | ok | ok | ok |
2 | ok | ok | ok | ok |
3 | ok | BEAM-1035 | ok | BEAM-1037 |
4 | ok | ok | ok | ok |
5 | ok | ok | ok | ok |
6 | ok | ok | ok | ok |
7 | ok | BEAM-2112 | ok | ok |
8 | ok | ok | ok | ok |
9 | ok | ok | ok | ok |
10 | ok | ok | ok | ok |
11 | ok | ok | ok | ok |
12 | ok | ok | ok | ok |
TODO
Query | Dataflow | Spark | Flink | Apex |
---|---|---|---|---|
0 |
TODO
Query | Dataflow | Spark | Flink | Apex |
---|---|---|---|---|
0 |
Batch Mode
mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=false --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"
Streaming Mode
mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=true --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"
Batch Mode
mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true"
Streaming Mode
mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true"
Batch Mode
mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true --flinkMaster=local"
Streaming Mode
mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true --flinkMaster=local"
Batch Mode
mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=false"
Streaming Mode
mvn exec:java -Dexec.mainClass=org.apache.beam.sdk.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=false"
Building package
mvn clean package -Pdataflow-runner
Submit to Google Dataflow service
java -cp sdks/java/nexmark/target/beam-sdks-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \ org.apache.beam.sdk.nexmark.Main \ --runner=DataflowRunner --project=<your project> \ --zone=<your zone> \ --workerMachineType=n1-highmem-8 \ --stagingLocation=<a gs path for staging> \ --streaming=true \ --sourceType=PUBSUB \ --pubSubMode=PUBLISH_ONLY \ --pubsubTopic=<an existing Pubsub topic> \ --resourceNameMode=VERBATIM \ --manageResources=false \ --monitorJobs=false \ --numEventGenerators=64 \ --numWorkers=16 \ --maxNumWorkers=16 \ --suite=SMOKE \ --firstEventRate=100000 \ --nextEventRate=100000 \ --ratePeriodSec=3600 \ --isRateLimited=true \ --avgPersonByteSize=500 \ --avgAuctionByteSize=500 \ --avgBidByteSize=500 \ --probDelayedEvent=0.000001 \ --occasionalDelaySec=3600 \ --numEvents=0 \ --useWallclockEventTime=true \ --usePubsubPublishTime=true \ --experiments=enable_custom_pubsub_sink
java -cp sdks/java/nexmark/target/beam-sdks-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \ org.apache.beam.sdk.nexmark.Main \ --runner=DataflowRunner --project=<your project> \ --zone=<your zone> \ --workerMachineType=n1-highmem-8 \ --stagingLocation=<a gs path for staging> \ --streaming=true \ --sourceType=PUBSUB \ --pubSubMode=SUBSCRIBE_ONLY \ --pubsubSubscription=<an existing Pubsub subscription to above topic> \ --resourceNameMode=VERBATIM \ --manageResources=false \ --monitorJobs=false \ --numWorkers=64 \ --maxNumWorkers=64 \ --suite=SMOKE \ --usePubsubPublishTime=true \ --outputPath=<a gs path under which log files will be written> \ --windowSizeSec=600 \ --occasionalDelaySec=3600 \ --maxLogEvents=10000 \ --experiments=enable_custom_pubsub_source
Building package
mvn clean package -Pspark-runner
Submit to the cluster
spark-submit --master yarn-client --class org.apache.beam.sdk.nexmark.Main --driver-memory 512m --executor-memory 512m --executor-cores 1 beam-sdks-java-nexmark-bundled-2.1.0-SNAPSHOT.jar --runner=SparkRunner --query=0 --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true