blob: 419d151c4de550b4ed638b8342c7b9d5a2c705e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.nexmark;
import static org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode.COMBINED;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.nexmark.NexmarkUtils.PubSubMode;
import org.apache.beam.sdk.nexmark.NexmarkUtils.SourceType;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.model.KnownSize;
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.nexmark.queries.BoundedSideInputJoin;
import org.apache.beam.sdk.nexmark.queries.BoundedSideInputJoinModel;
import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
import org.apache.beam.sdk.nexmark.queries.NexmarkQueryModel;
import org.apache.beam.sdk.nexmark.queries.NexmarkQueryUtil;
import org.apache.beam.sdk.nexmark.queries.Query0;
import org.apache.beam.sdk.nexmark.queries.Query0Model;
import org.apache.beam.sdk.nexmark.queries.Query1;
import org.apache.beam.sdk.nexmark.queries.Query10;
import org.apache.beam.sdk.nexmark.queries.Query11;
import org.apache.beam.sdk.nexmark.queries.Query12;
import org.apache.beam.sdk.nexmark.queries.Query1Model;
import org.apache.beam.sdk.nexmark.queries.Query2;
import org.apache.beam.sdk.nexmark.queries.Query2Model;
import org.apache.beam.sdk.nexmark.queries.Query3;
import org.apache.beam.sdk.nexmark.queries.Query3Model;
import org.apache.beam.sdk.nexmark.queries.Query4;
import org.apache.beam.sdk.nexmark.queries.Query4Model;
import org.apache.beam.sdk.nexmark.queries.Query5;
import org.apache.beam.sdk.nexmark.queries.Query5Model;
import org.apache.beam.sdk.nexmark.queries.Query6;
import org.apache.beam.sdk.nexmark.queries.Query6Model;
import org.apache.beam.sdk.nexmark.queries.Query7;
import org.apache.beam.sdk.nexmark.queries.Query7Model;
import org.apache.beam.sdk.nexmark.queries.Query8;
import org.apache.beam.sdk.nexmark.queries.Query8Model;
import org.apache.beam.sdk.nexmark.queries.Query9;
import org.apache.beam.sdk.nexmark.queries.Query9Model;
import org.apache.beam.sdk.nexmark.queries.SessionSideInputJoin;
import org.apache.beam.sdk.nexmark.queries.SessionSideInputJoinModel;
import org.apache.beam.sdk.nexmark.queries.sql.SqlBoundedSideInputJoin;
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery0;
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery1;
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery2;
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery3;
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery7;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.LoggerFactory;
/** Run a single Nexmark query using a given configuration. */
public class NexmarkLauncher<OptionT extends NexmarkOptions> {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NexmarkLauncher.class);
/** Command line parameter value for query language. */
private static final String SQL = "sql";
/** Minimum number of samples needed for 'stead-state' rate calculation. */
private static final int MIN_SAMPLES = 9;
/** Minimum length of time over which to consider samples for 'steady-state' rate calculation. */
private static final Duration MIN_WINDOW = Duration.standardMinutes(2);
/** Delay between perf samples. */
private static final Duration PERF_DELAY = Duration.standardSeconds(15);
/**
* How long to let streaming pipeline run after all events have been generated and we've seen no
* activity.
*/
private static final Duration DONE_DELAY = Duration.standardMinutes(1);
/** How long to allow no activity at sources and sinks without warning. */
private static final Duration STUCK_WARNING_DELAY = Duration.standardMinutes(10);
/**
* How long to let streaming pipeline run after we've seen no activity at sources or sinks, even
* if all events have not been generated.
*/
private static final Duration STUCK_TERMINATE_DELAY = Duration.standardHours(1);
/** NexmarkOptions for this run. */
private final OptionT options;
/** Which configuration we are running. */
private NexmarkConfiguration configuration;
/** If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null. */
@Nullable private Monitor<Event> publisherMonitor;
/**
* If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null.
*/
@Nullable private PipelineResult publisherResult;
/** Result for the main pipeline. */
@Nullable private PipelineResult mainResult;
/** Query name we are running. */
@Nullable private String queryName;
/** Full path of the PubSub topic (when PubSub is enabled). */
@Nullable private String pubsubTopic;
/** Full path of the PubSub subscription (when PubSub is enabled). */
@Nullable private String pubsubSubscription;
@Nullable private PubsubHelper pubsubHelper;
public NexmarkLauncher(OptionT options, NexmarkConfiguration configuration) {
this.options = options;
this.configuration = configuration;
}
/** Is this query running in streaming mode? */
private boolean isStreaming() {
return options.isStreaming();
}
/** Return maximum number of workers. */
private int maxNumWorkers() {
return 5;
}
/**
* Find a 'steady state' events/sec from {@code snapshots} and store it in {@code perf} if found.
*/
private void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) {
if (!options.isStreaming()) {
return;
}
// Find the first sample with actual event and result counts.
int dataStart = 0;
for (; dataStart < snapshots.size(); dataStart++) {
if (snapshots.get(dataStart).numEvents >= 0 && snapshots.get(dataStart).numResults >= 0) {
break;
}
}
// Find the last sample which demonstrated progress.
int dataEnd = snapshots.size() - 1;
for (; dataEnd > dataStart; dataEnd--) {
if (snapshots.get(dataEnd).anyActivity(snapshots.get(dataEnd - 1))) {
break;
}
}
int numSamples = dataEnd - dataStart + 1;
if (numSamples < MIN_SAMPLES) {
// Not enough samples.
NexmarkUtils.console(
"%d samples not enough to calculate steady-state event rate", numSamples);
return;
}
// We'll look at only the middle third samples.
int sampleStart = dataStart + numSamples / 3;
int sampleEnd = dataEnd - numSamples / 3;
double sampleSec =
snapshots.get(sampleEnd).secSinceStart - snapshots.get(sampleStart).secSinceStart;
if (sampleSec < MIN_WINDOW.getStandardSeconds()) {
// Not sampled over enough time.
NexmarkUtils.console(
"sample of %.1f sec not long enough to calculate steady-state event rate", sampleSec);
return;
}
// Find rate with least squares error.
double sumxx = 0.0;
double sumxy = 0.0;
long prevNumEvents = -1;
for (int i = sampleStart; i <= sampleEnd; i++) {
if (prevNumEvents == snapshots.get(i).numEvents) {
// Skip samples with no change in number of events since they contribute no data.
continue;
}
// Use the effective runtime instead of wallclock time so we can
// insulate ourselves from delays and stutters in the query manager.
double x = snapshots.get(i).runtimeSec;
prevNumEvents = snapshots.get(i).numEvents;
double y = prevNumEvents;
sumxx += x * x;
sumxy += x * y;
}
double eventsPerSec = sumxy / sumxx;
NexmarkUtils.console("revising events/sec from %.1f to %.1f", perf.eventsPerSec, eventsPerSec);
perf.eventsPerSec = eventsPerSec;
}
/** Return the current performance given {@code eventMonitor} and {@code resultMonitor}. */
private NexmarkPerf currentPerf(
long startMsSinceEpoch,
long now,
PipelineResult result,
List<NexmarkPerf.ProgressSnapshot> snapshots,
Monitor<?> eventMonitor,
Monitor<?> resultMonitor) {
NexmarkPerf perf = new NexmarkPerf();
MetricsReader eventMetrics = new MetricsReader(result, eventMonitor.name);
long numEvents = eventMetrics.getCounterMetric(eventMonitor.prefix + ".elements");
long numEventBytes = eventMetrics.getCounterMetric(eventMonitor.prefix + ".bytes");
long eventStart = eventMetrics.getStartTimeMetric(eventMonitor.prefix + ".startTime");
long eventEnd = eventMetrics.getEndTimeMetric(eventMonitor.prefix + ".endTime");
MetricsReader resultMetrics = new MetricsReader(result, resultMonitor.name);
long numResults = resultMetrics.getCounterMetric(resultMonitor.prefix + ".elements");
long numResultBytes = resultMetrics.getCounterMetric(resultMonitor.prefix + ".bytes");
long resultStart = resultMetrics.getStartTimeMetric(resultMonitor.prefix + ".startTime");
long resultEnd = resultMetrics.getEndTimeMetric(resultMonitor.prefix + ".endTime");
long timestampStart =
resultMetrics.getStartTimeMetric(resultMonitor.prefix + ".startTimestamp");
long timestampEnd = resultMetrics.getEndTimeMetric(resultMonitor.prefix + ".endTimestamp");
long effectiveEnd = -1;
if (eventEnd >= 0 && resultEnd >= 0) {
// It is possible for events to be generated after the last result was emitted.
// (Eg Query 2, which only yields results for a small prefix of the event stream.)
// So use the max of last event and last result times.
effectiveEnd = Math.max(eventEnd, resultEnd);
} else if (resultEnd >= 0) {
effectiveEnd = resultEnd;
} else if (eventEnd >= 0) {
// During startup we may have no result yet, but we would still like to track how
// long the pipeline has been running.
effectiveEnd = eventEnd;
}
if (effectiveEnd >= 0 && eventStart >= 0 && effectiveEnd >= eventStart) {
perf.runtimeSec = (effectiveEnd - eventStart) / 1000.0;
}
if (numEvents >= 0) {
perf.numEvents = numEvents;
}
if (numEvents >= 0 && perf.runtimeSec > 0.0) {
// For streaming we may later replace this with a 'steady-state' value calculated
// from the progress snapshots.
perf.eventsPerSec = numEvents / perf.runtimeSec;
}
if (numEventBytes >= 0 && perf.runtimeSec > 0.0) {
perf.eventBytesPerSec = numEventBytes / perf.runtimeSec;
}
if (numResults >= 0) {
perf.numResults = numResults;
}
if (numResults >= 0 && perf.runtimeSec > 0.0) {
perf.resultsPerSec = numResults / perf.runtimeSec;
}
if (numResultBytes >= 0 && perf.runtimeSec > 0.0) {
perf.resultBytesPerSec = numResultBytes / perf.runtimeSec;
}
if (eventStart >= 0) {
perf.startupDelaySec = (eventStart - startMsSinceEpoch) / 1000.0;
}
if (resultStart >= 0 && eventStart >= 0 && resultStart >= eventStart) {
perf.processingDelaySec = (resultStart - eventStart) / 1000.0;
}
if (timestampStart >= 0 && timestampEnd >= 0 && perf.runtimeSec > 0.0) {
double eventRuntimeSec = (timestampEnd - timestampStart) / 1000.0;
perf.timeDilation = eventRuntimeSec / perf.runtimeSec;
}
if (resultEnd >= 0) {
// Fill in the shutdown delay assuming the job has now finished.
perf.shutdownDelaySec = (now - resultEnd) / 1000.0;
}
// As soon as available, try to capture cumulative cost at this point too.
NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
snapshot.secSinceStart = (now - startMsSinceEpoch) / 1000.0;
snapshot.runtimeSec = perf.runtimeSec;
snapshot.numEvents = numEvents;
snapshot.numResults = numResults;
snapshots.add(snapshot);
captureSteadyState(perf, snapshots);
return perf;
}
/** Build and run a pipeline using specified options. */
interface PipelineBuilder<OptionT extends NexmarkOptions> {
void build(OptionT publishOnlyOptions);
}
/** Invoke the builder with options suitable for running a publish-only child pipeline. */
private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) {
String jobName = options.getJobName();
String appName = options.getAppName();
int numWorkers = options.getNumWorkers();
int maxNumWorkers = options.getMaxNumWorkers();
options.setJobName("p-" + jobName);
options.setAppName("p-" + appName);
int eventGeneratorWorkers = configuration.numEventGenerators;
// TODO: assign one generator per core rather than one per worker.
if (numWorkers > 0 && eventGeneratorWorkers > 0) {
options.setNumWorkers(Math.min(numWorkers, eventGeneratorWorkers));
}
if (maxNumWorkers > 0 && eventGeneratorWorkers > 0) {
options.setMaxNumWorkers(Math.min(maxNumWorkers, eventGeneratorWorkers));
}
try {
builder.build(options);
} finally {
options.setJobName(jobName);
options.setAppName(appName);
options.setNumWorkers(numWorkers);
options.setMaxNumWorkers(maxNumWorkers);
}
}
/**
* Monitor the performance and progress of a running job. Return final performance if it was
* measured.
*/
@Nullable
private NexmarkPerf monitor(NexmarkQuery query) {
if (!options.getMonitorJobs()) {
return null;
}
if (configuration.debug) {
NexmarkUtils.console("Waiting for main pipeline to 'finish'");
} else {
NexmarkUtils.console("--debug=false, so job will not self-cancel");
}
PipelineResult job = mainResult;
PipelineResult publisherJob = publisherResult;
List<NexmarkPerf.ProgressSnapshot> snapshots = new ArrayList<>();
long startMsSinceEpoch = System.currentTimeMillis();
long endMsSinceEpoch = -1;
if (options.getRunningTimeMinutes() != null) {
endMsSinceEpoch =
startMsSinceEpoch
+ Duration.standardMinutes(options.getRunningTimeMinutes()).getMillis()
- Duration.standardSeconds(configuration.preloadSeconds).getMillis();
}
long lastActivityMsSinceEpoch = -1;
NexmarkPerf perf = null;
boolean waitingForShutdown = false;
boolean cancelJob = false;
boolean publisherCancelled = false;
List<String> errors = new ArrayList<>();
while (true) {
long now = System.currentTimeMillis();
if (endMsSinceEpoch >= 0 && now > endMsSinceEpoch && !waitingForShutdown) {
NexmarkUtils.console("Reached end of test, cancelling job");
try {
cancelJob = true;
job.cancel();
} catch (IOException e) {
throw new RuntimeException("Unable to cancel main job: ", e);
}
if (publisherResult != null) {
try {
publisherJob.cancel();
} catch (IOException e) {
throw new RuntimeException("Unable to cancel publisher job: ", e);
}
publisherCancelled = true;
}
waitingForShutdown = true;
}
PipelineResult.State state = job.getState();
NexmarkUtils.console(
"%s %s%s", state, queryName, waitingForShutdown ? " (waiting for shutdown)" : "");
NexmarkPerf currPerf;
if (configuration.debug) {
currPerf =
currentPerf(
startMsSinceEpoch, now, job, snapshots, query.eventMonitor, query.resultMonitor);
} else {
currPerf = null;
}
if (perf == null || perf.anyActivity(currPerf)) {
lastActivityMsSinceEpoch = now;
}
if (options.isStreaming() && !waitingForShutdown) {
Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
long fatalCount = new MetricsReader(job, query.getName()).getCounterMetric("fatal");
if (fatalCount == -1) {
fatalCount = 0;
}
if (fatalCount > 0) {
NexmarkUtils.console("ERROR: job has fatal errors, cancelling.");
errors.add(String.format("Pipeline reported %s fatal errors", fatalCount));
waitingForShutdown = true;
cancelJob = true;
} else if (configuration.debug
&& configuration.numEvents > 0
&& currPerf.numEvents == configuration.numEvents
&& currPerf.numResults >= 0
&& quietFor.isLongerThan(DONE_DELAY)) {
NexmarkUtils.console("streaming query appears to have finished waiting for completion.");
waitingForShutdown = true;
} else if (quietFor.isLongerThan(STUCK_TERMINATE_DELAY)) {
NexmarkUtils.console(
"ERROR: streaming query appears to have been stuck for %d minutes, cancelling job.",
quietFor.getStandardMinutes());
errors.add(
String.format(
"Cancelling streaming job since it appeared stuck for %d min.",
quietFor.getStandardMinutes()));
waitingForShutdown = true;
cancelJob = true;
} else if (quietFor.isLongerThan(STUCK_WARNING_DELAY)) {
NexmarkUtils.console(
"WARNING: streaming query appears to have been stuck for %d min.",
quietFor.getStandardMinutes());
}
if (cancelJob) {
try {
job.cancel();
} catch (IOException e) {
throw new RuntimeException("Unable to cancel main job: ", e);
}
}
}
perf = currPerf;
boolean running = true;
switch (state) {
case UNKNOWN:
case STOPPED:
case RUNNING:
// Keep going.
break;
case DONE:
// All done.
running = false;
break;
case CANCELLED:
running = false;
if (!cancelJob) {
errors.add("Job was unexpectedly cancelled");
}
break;
case FAILED:
case UPDATED:
// Abnormal termination.
running = false;
errors.add("Job was unexpectedly updated");
break;
}
if (!running) {
break;
}
if (lastActivityMsSinceEpoch == now) {
NexmarkUtils.console("new perf %s", perf);
} else {
NexmarkUtils.console("no activity");
}
try {
Thread.sleep(PERF_DELAY.getMillis());
} catch (InterruptedException e) {
Thread.interrupted();
NexmarkUtils.console("Interrupted: pipeline is still running");
}
}
perf.errors = errors;
perf.snapshots = snapshots;
if (publisherResult != null) {
NexmarkUtils.console("Shutting down publisher pipeline.");
try {
if (!publisherCancelled) {
publisherJob.cancel();
}
publisherJob.waitUntilFinish(Duration.standardMinutes(5));
} catch (IOException e) {
throw new RuntimeException("Unable to cancel publisher job: ", e);
}
}
return perf;
}
// ================================================================================
// Basic sources and sinks
// ================================================================================
/** Return a topic name. */
private String shortTopic(long now) {
String baseTopic = options.getPubsubTopic();
if (Strings.isNullOrEmpty(baseTopic)) {
throw new RuntimeException("Missing --pubsubTopic");
}
switch (options.getResourceNameMode()) {
case VERBATIM:
return baseTopic;
case QUERY:
return String.format("%s_%s_source", baseTopic, queryName);
case QUERY_AND_SALT:
return String.format("%s_%s_%d_source", baseTopic, queryName, now);
case QUERY_RUNNER_AND_MODE:
return String.format(
"%s_%s_%s_%s_source",
baseTopic, queryName, options.getRunner().getSimpleName(), options.isStreaming());
}
throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
}
/** Return a subscription name. */
private String shortSubscription(long now) {
String baseSubscription = options.getPubsubSubscription();
if (Strings.isNullOrEmpty(baseSubscription)) {
throw new RuntimeException("Missing --pubsubSubscription");
}
switch (options.getResourceNameMode()) {
case VERBATIM:
return baseSubscription;
case QUERY:
return String.format("%s_%s_source", baseSubscription, queryName);
case QUERY_AND_SALT:
return String.format("%s_%s_%d_source", baseSubscription, queryName, now);
case QUERY_RUNNER_AND_MODE:
return String.format(
"%s_%s_%s_%s_source",
baseSubscription,
queryName,
options.getRunner().getSimpleName(),
options.isStreaming());
}
throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
}
/** Return a file name for plain text. */
private String textFilename(long now) {
String baseFilename = options.getOutputPath();
if (Strings.isNullOrEmpty(baseFilename)) {
throw new RuntimeException("Missing --outputPath");
}
switch (options.getResourceNameMode()) {
case VERBATIM:
return baseFilename;
case QUERY:
return String.format("%s/nexmark_%s.txt", baseFilename, queryName);
case QUERY_AND_SALT:
return String.format("%s/nexmark_%s_%d.txt", baseFilename, queryName, now);
case QUERY_RUNNER_AND_MODE:
return String.format(
"%s/nexmark_%s_%s_%s",
baseFilename, queryName, options.getRunner().getSimpleName(), options.isStreaming());
}
throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
}
/** Return a directory for logs. */
private String logsDir(long now) {
String baseFilename = options.getOutputPath();
if (Strings.isNullOrEmpty(baseFilename)) {
throw new RuntimeException("Missing --outputPath");
}
switch (options.getResourceNameMode()) {
case VERBATIM:
return baseFilename;
case QUERY:
return String.format("%s/logs_%s", baseFilename, queryName);
case QUERY_AND_SALT:
return String.format("%s/logs_%s_%d", baseFilename, queryName, now);
case QUERY_RUNNER_AND_MODE:
return String.format(
"%s/logs_%s_%s_%s",
baseFilename, queryName, options.getRunner().getSimpleName(), options.isStreaming());
}
throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
}
/** Return a source of synthetic events. */
private PCollection<Event> sourceEventsFromSynthetic(Pipeline p) {
if (isStreaming()) {
NexmarkUtils.console("Generating %d events in streaming mode", configuration.numEvents);
return p.apply(queryName + ".ReadUnbounded", NexmarkUtils.streamEventsSource(configuration));
} else {
NexmarkUtils.console("Generating %d events in batch mode", configuration.numEvents);
return p.apply(queryName + ".ReadBounded", NexmarkUtils.batchEventsSource(configuration));
}
}
/** Return source of events from Pubsub. */
private PCollection<Event> sourceEventsFromPubsub(Pipeline p) {
NexmarkUtils.console("Reading events from Pubsub %s", pubsubSubscription);
PubsubIO.Read<PubsubMessage> io =
PubsubIO.readMessagesWithAttributes()
.fromSubscription(pubsubSubscription)
.withIdAttribute(NexmarkUtils.PUBSUB_ID);
if (!configuration.usePubsubPublishTime) {
io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
}
return p.apply(queryName + ".ReadPubsubEvents", io)
.apply(queryName + ".PubsubMessageToEvent", ParDo.of(new PubsubMessageEventDoFn()));
}
static final DoFn<Event, byte[]> EVENT_TO_BYTEARRAY =
new DoFn<Event, byte[]>() {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, c.element());
c.output(encodedEvent);
}
};
/** Send {@code events} to Kafka. */
private void sinkEventsToKafka(PCollection<Event> events) {
checkArgument((options.getBootstrapServers() != null), "Missing --bootstrapServers");
NexmarkUtils.console("Writing events to Kafka Topic %s", options.getKafkaTopic());
PCollection<byte[]> eventToBytes = events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY));
eventToBytes.apply(
KafkaIO.<Void, byte[]>write()
.withBootstrapServers(options.getBootstrapServers())
.withTopic(options.getKafkaTopic())
.withValueSerializer(ByteArraySerializer.class)
.values());
}
static final DoFn<KV<Long, byte[]>, Event> BYTEARRAY_TO_EVENT =
new DoFn<KV<Long, byte[]>, Event>() {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
byte[] encodedEvent = c.element().getValue();
Event event = CoderUtils.decodeFromByteArray(Event.CODER, encodedEvent);
c.output(event);
}
};
/** Return source of events from Kafka. */
private PCollection<Event> sourceEventsFromKafka(Pipeline p, final Instant now) {
checkArgument((options.getBootstrapServers() != null), "Missing --bootstrapServers");
NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaTopic());
KafkaIO.Read<Long, byte[]> read =
KafkaIO.<Long, byte[]>read()
.withBootstrapServers(options.getBootstrapServers())
.withTopic(options.getKafkaTopic())
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withStartReadTime(now)
.withMaxNumRecords(
options.getNumEvents() != null ? options.getNumEvents() : Long.MAX_VALUE);
return p.apply(queryName + ".ReadKafkaEvents", read.withoutMetadata())
.apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT));
}
/** Return Avro source of events from {@code options.getInputFilePrefix}. */
private PCollection<Event> sourceEventsFromAvro(Pipeline p) {
String filename = options.getInputPath();
if (Strings.isNullOrEmpty(filename)) {
throw new RuntimeException("Missing --inputPath");
}
NexmarkUtils.console("Reading events from Avro files at %s", filename);
return p.apply(
queryName + ".ReadAvroEvents", AvroIO.read(Event.class).from(filename + "*.avro"))
.apply("OutputWithTimestamp", NexmarkQueryUtil.EVENT_TIMESTAMP_FROM_DATA);
}
/** Send {@code events} to Pubsub. */
private void sinkEventsToPubsub(PCollection<Event> events) {
checkState(pubsubTopic != null, "Pubsub topic needs to be set up before initializing sink");
NexmarkUtils.console("Writing events to Pubsub %s", pubsubTopic);
PubsubIO.Write<PubsubMessage> io =
PubsubIO.writeMessages().to(pubsubTopic).withIdAttribute(NexmarkUtils.PUBSUB_ID);
if (!configuration.usePubsubPublishTime) {
io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
}
events
.apply(queryName + ".EventToPubsubMessage", ParDo.of(new EventPubsubMessageDoFn()))
.apply(queryName + ".WritePubsubEvents", io);
}
/** Send {@code formattedResults} to Kafka. */
private void sinkResultsToKafka(PCollection<String> formattedResults) {
checkArgument((options.getBootstrapServers() != null), "Missing --bootstrapServers");
NexmarkUtils.console("Writing results to Kafka Topic %s", options.getKafkaResultsTopic());
formattedResults.apply(
queryName + ".WriteKafkaResults",
KafkaIO.<Void, String>write()
.withBootstrapServers(options.getBootstrapServers())
.withTopic(options.getKafkaResultsTopic())
.withValueSerializer(StringSerializer.class)
.values());
}
/** Send {@code formattedResults} to Pubsub. */
private void sinkResultsToPubsub(PCollection<String> formattedResults, long now) {
String shortTopic = shortTopic(now);
NexmarkUtils.console("Writing results to Pubsub %s", shortTopic);
PubsubIO.Write<String> io =
PubsubIO.writeStrings().to(shortTopic).withIdAttribute(NexmarkUtils.PUBSUB_ID);
if (!configuration.usePubsubPublishTime) {
io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
}
formattedResults.apply(queryName + ".WritePubsubResults", io);
}
/**
* Sink all raw Events in {@code source} to {@code options.getOutputPath}. This will configure the
* job to write the following files:
*
* <ul>
* <li>{@code $outputPath/event*.avro} All Event entities.
* <li>{@code $outputPath/auction*.avro} Auction entities.
* <li>{@code $outputPath/bid*.avro} Bid entities.
* <li>{@code $outputPath/person*.avro} Person entities.
* </ul>
*
* @param source A PCollection of events.
*/
private void sinkEventsToAvro(PCollection<Event> source) {
String filename = options.getOutputPath();
if (Strings.isNullOrEmpty(filename)) {
throw new RuntimeException("Missing --outputPath");
}
NexmarkUtils.console("Writing events to Avro files at %s", filename);
source.apply(
queryName + ".WriteAvroEvents",
AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro"));
source
.apply(NexmarkQueryUtil.JUST_BIDS)
.apply(
queryName + ".WriteAvroBids",
AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro"));
source
.apply(NexmarkQueryUtil.JUST_NEW_AUCTIONS)
.apply(
queryName + ".WriteAvroAuctions",
AvroIO.write(Auction.class).to(filename + "/auction").withSuffix(".avro"));
source
.apply(NexmarkQueryUtil.JUST_NEW_PERSONS)
.apply(
queryName + ".WriteAvroPeople",
AvroIO.write(Person.class).to(filename + "/person").withSuffix(".avro"));
}
/** Send {@code formattedResults} to text files. */
private void sinkResultsToText(PCollection<String> formattedResults, long now) {
String filename = textFilename(now);
NexmarkUtils.console("Writing results to text files at %s", filename);
formattedResults.apply(queryName + ".WriteTextResults", TextIO.write().to(filename));
}
private static class StringToTableRow extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
int n = ThreadLocalRandom.current().nextInt(10);
List<TableRow> records = new ArrayList<>(n);
for (int i = 0; i < n; i++) {
records.add(new TableRow().set("index", i).set("value", Integer.toString(i)));
}
c.output(new TableRow().set("result", c.element()).set("records", records));
}
}
/** Send {@code formattedResults} to BigQuery. */
private void sinkResultsToBigQuery(
PCollection<String> formattedResults, long now, String version) {
String tableSpec = NexmarkUtils.tableSpec(options, queryName, now, version);
TableSchema tableSchema =
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("result").setType("STRING"),
new TableFieldSchema()
.setName("records")
.setMode("REPEATED")
.setType("RECORD")
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("index").setType("INTEGER"),
new TableFieldSchema().setName("value").setType("STRING")))));
NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec);
BigQueryIO.Write io =
BigQueryIO.write()
.to(tableSpec)
.withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
formattedResults
.apply(queryName + ".StringToTableRow", ParDo.of(new StringToTableRow()))
.apply(queryName + ".WriteBigQueryResults", io);
}
/** Creates or reuses PubSub topics and subscriptions as configured. */
private void setupPubSubResources(long now) throws IOException {
String shortTopic = shortTopic(now);
String shortSubscription = shortSubscription(now);
if (!options.getManageResources() || configuration.pubSubMode == PubSubMode.SUBSCRIBE_ONLY) {
// The topic should already have been created by the user or
// a companion 'PUBLISH_ONLY' process.
pubsubTopic = pubsubHelper.reuseTopic(shortTopic).getPath();
} else {
// Create a fresh topic. It will be removed when the job is done.
pubsubTopic = pubsubHelper.createTopic(shortTopic).getPath();
}
// Create/confirm the subscription.
if (configuration.pubSubMode == PubSubMode.PUBLISH_ONLY) {
// Nothing to consume.
} else if (options.getManageResources()) {
pubsubSubscription = pubsubHelper.createSubscription(shortTopic, shortSubscription).getPath();
} else {
// The subscription should already have been created by the user.
pubsubSubscription = pubsubHelper.reuseSubscription(shortTopic, shortSubscription).getPath();
}
}
// ================================================================================
// Construct overall pipeline
// ================================================================================
/** Return source of events for this run, or null if we are simply publishing events to Pubsub. */
private PCollection<Event> createSource(Pipeline p, final Instant now) throws IOException {
PCollection<Event> source = null;
switch (configuration.sourceType) {
case DIRECT:
source = sourceEventsFromSynthetic(p);
break;
case AVRO:
source = sourceEventsFromAvro(p);
break;
case KAFKA:
case PUBSUB:
if (configuration.sourceType == SourceType.PUBSUB) {
setupPubSubResources(now.getMillis());
}
// Setup the sink for the publisher.
switch (configuration.pubSubMode) {
case SUBSCRIBE_ONLY:
// Nothing to publish.
break;
case PUBLISH_ONLY:
{
// Send synthesized events to Kafka or Pubsub in this job.
PCollection<Event> events =
sourceEventsFromSynthetic(p)
.apply(queryName + ".Snoop", NexmarkUtils.snoop(queryName));
if (configuration.sourceType == NexmarkUtils.SourceType.KAFKA) {
sinkEventsToKafka(events);
} else { // pubsub
sinkEventsToPubsub(events);
}
}
break;
case COMBINED:
// Send synthesized events to Kafka or Pubsub in separate publisher job.
// We won't start the main pipeline until the publisher has sent the pre-load events.
// We'll shutdown the publisher job when we notice the main job has finished.
invokeBuilderForPublishOnlyPipeline(
publishOnlyOptions -> {
Pipeline sp = Pipeline.create(publishOnlyOptions);
NexmarkUtils.setupPipeline(configuration.coderStrategy, sp);
publisherMonitor = new Monitor<>(queryName, "publisher");
PCollection<Event> events =
sourceEventsFromSynthetic(sp)
.apply(queryName + ".Monitor", publisherMonitor.getTransform());
if (configuration.sourceType == NexmarkUtils.SourceType.KAFKA) {
sinkEventsToKafka(events);
} else { // pubsub
sinkEventsToPubsub(events);
}
publisherResult = sp.run();
NexmarkUtils.console("Publisher job is started.");
});
break;
}
// Setup the source for the consumer.
switch (configuration.pubSubMode) {
case PUBLISH_ONLY:
// Nothing to consume. Leave source null.
break;
case SUBSCRIBE_ONLY:
case COMBINED:
{
// Read events from Kafka or Pubsub.
if (configuration.sourceType == NexmarkUtils.SourceType.KAFKA) {
// We need to have the same indexes for Publisher (sink) and Subscriber (source)
// pipelines in COMBINED mode (when we run them in sequence). It means that
// Subscriber should start reading from the same index as Publisher started to write
// pre-load events even if we run Subscriber right after Publisher has been
// finished. In other case. when pubSubMode=SUBSCRIBE_ONLY, now should be null and
// it will be ignored.
source =
sourceEventsFromKafka(p, configuration.pubSubMode == COMBINED ? now : null);
} else {
source = sourceEventsFromPubsub(p);
}
}
break;
}
break;
}
return source;
}
private static final TupleTag<String> MAIN = new TupleTag<String>() {};
private static final TupleTag<String> SIDE = new TupleTag<String>() {};
private static class PartitionDoFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().hashCode() % 2 == 0) {
c.output(c.element());
} else {
c.output(SIDE, c.element());
}
}
}
/** Consume {@code results}. */
private void sink(PCollection<TimestampedValue<KnownSize>> results, long now) {
if (configuration.sinkType == NexmarkUtils.SinkType.COUNT_ONLY) {
// Avoid the cost of formatting the results.
results.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName));
return;
}
PCollection<String> formattedResults =
results.apply(queryName + ".Format", NexmarkUtils.format(queryName));
if (options.getLogResults()) {
formattedResults =
formattedResults.apply(
queryName + ".Results.Log", NexmarkUtils.log(queryName + ".Results"));
}
switch (configuration.sinkType) {
case DEVNULL:
// Discard all results
formattedResults.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName));
break;
case PUBSUB:
sinkResultsToPubsub(formattedResults, now);
break;
case KAFKA:
sinkResultsToKafka(formattedResults);
break;
case TEXT:
sinkResultsToText(formattedResults, now);
break;
case AVRO:
NexmarkUtils.console(
"WARNING: with --sinkType=AVRO, actual query results will be discarded.");
break;
case BIGQUERY:
// Multiple BigQuery backends to mimic what most customers do.
PCollectionTuple res =
formattedResults.apply(
queryName + ".Partition",
ParDo.of(new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE)));
sinkResultsToBigQuery(res.get(MAIN), now, "main");
sinkResultsToBigQuery(res.get(SIDE), now, "side");
sinkResultsToBigQuery(formattedResults, now, "copy");
break;
case COUNT_ONLY:
// Short-circuited above.
throw new RuntimeException();
}
}
// ================================================================================
// Entry point
// ================================================================================
/**
* Calculate the distribution of the expected rate of results per minute (in event time, not
* wallclock time).
*/
private void modelResultRates(NexmarkQueryModel model) {
List<Long> counts = Lists.newArrayList(model.simulator().resultsPerWindow());
Collections.sort(counts);
int n = counts.size();
if (n < 5) {
NexmarkUtils.console("Query%s: only %d samples", model.configuration.query, n);
} else {
NexmarkUtils.console(
"Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d",
model.configuration.query,
n,
counts.get(0),
counts.get(n / 4),
counts.get(n / 2),
counts.get(n - 1 - n / 4),
counts.get(n - 1));
}
}
/** Run {@code configuration} and return its performance if possible. */
@Nullable
public NexmarkPerf run() throws IOException {
if (options.getManageResources() && !options.getMonitorJobs()) {
throw new RuntimeException("If using --manageResources then must also use --monitorJobs.");
}
//
// Setup per-run state.
//
checkState(queryName == null);
if (configuration.sourceType.equals(SourceType.PUBSUB)) {
pubsubHelper = PubsubHelper.create(options);
}
try {
NexmarkUtils.console("Running %s", configuration.toShortString());
if (configuration.numEvents < 0) {
NexmarkUtils.console("skipping since configuration is disabled");
return null;
}
NexmarkQuery<? extends KnownSize> query = getNexmarkQuery();
if (query == null) {
NexmarkUtils.console("skipping since configuration is not implemented");
return null;
}
queryName = query.getName();
// Append queryName to temp location
if (!"".equals(options.getTempLocation())) {
options.setTempLocation(options.getTempLocation() + "/" + queryName);
}
NexmarkQueryModel model = getNexmarkQueryModel();
if (options.getJustModelResultRate()) {
if (model == null) {
throw new RuntimeException(String.format("No model for %s", queryName));
}
modelResultRates(model);
return null;
}
final Instant now = Instant.now();
Pipeline p = Pipeline.create(options);
NexmarkUtils.setupPipeline(configuration.coderStrategy, p);
// Generate events.
PCollection<Event> source = createSource(p, now);
if (query.getTransform().needsSideInput()) {
query.getTransform().setSideInput(NexmarkUtils.prepareSideInput(p, configuration));
}
if (options.getLogEvents()) {
source = source.apply(queryName + ".Events.Log", NexmarkUtils.log(queryName + ".Events"));
}
// Source will be null if source type is PUBSUB and mode is PUBLISH_ONLY.
// In that case there's nothing more to add to pipeline.
if (source != null) {
// Optionally sink events in Avro format.
// (Query results are ignored).
if (configuration.sinkType == NexmarkUtils.SinkType.AVRO) {
sinkEventsToAvro(source);
}
// Query 10 logs all events to Google Cloud storage files. It could generate a lot of logs,
// so, set parallelism. Also set the output path where to write log files.
if (configuration.query == NexmarkQueryName.LOG_TO_SHARDED_FILES) {
String path = null;
if (options.getOutputPath() != null && !options.getOutputPath().isEmpty()) {
path = logsDir(now.getMillis());
}
((Query10) query.getTransform()).setOutputPath(path);
((Query10) query.getTransform()).setMaxNumWorkers(maxNumWorkers());
}
// Apply query.
PCollection<TimestampedValue<KnownSize>> results =
(PCollection<TimestampedValue<KnownSize>>) source.apply(query);
if (options.getAssertCorrectness()) {
if (model == null) {
throw new RuntimeException(String.format("No model for %s", queryName));
}
// We know all our streams have a finite number of elements.
results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
// If we have a finite number of events then assert our pipeline's
// results match those of a model using the same sequence of events.
PAssert.that(results).satisfies(model.assertionFor());
}
// Output results.
sink(results, now.getMillis());
}
mainResult = p.run();
mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout));
return monitor(query);
} finally {
if (pubsubHelper != null) {
pubsubHelper.cleanup();
pubsubHelper = null;
}
configuration = null;
queryName = null;
}
}
private boolean isSql() {
return SQL.equalsIgnoreCase(options.getQueryLanguage());
}
private NexmarkQueryModel getNexmarkQueryModel() {
Map<NexmarkQueryName, NexmarkQueryModel> models = createQueryModels();
return models.get(configuration.query);
}
private NexmarkQuery<?> getNexmarkQuery() {
Map<NexmarkQueryName, NexmarkQuery> queries = createQueries();
return queries.get(configuration.query);
}
private Map<NexmarkQueryName, NexmarkQueryModel> createQueryModels() {
return isSql() ? createSqlQueryModels() : createJavaQueryModels();
}
private Map<NexmarkQueryName, NexmarkQueryModel> createSqlQueryModels() {
return ImmutableMap.of();
}
private Map<NexmarkQueryName, NexmarkQueryModel> createJavaQueryModels() {
return ImmutableMap.<NexmarkQueryName, NexmarkQueryModel>builder()
.put(NexmarkQueryName.PASSTHROUGH, new Query0Model(configuration))
.put(NexmarkQueryName.CURRENCY_CONVERSION, new Query1Model(configuration))
.put(NexmarkQueryName.SELECTION, new Query2Model(configuration))
.put(NexmarkQueryName.LOCAL_ITEM_SUGGESTION, new Query3Model(configuration))
.put(NexmarkQueryName.AVERAGE_PRICE_FOR_CATEGORY, new Query4Model(configuration))
.put(NexmarkQueryName.HOT_ITEMS, new Query5Model(configuration))
.put(NexmarkQueryName.AVERAGE_SELLING_PRICE_BY_SELLER, new Query6Model(configuration))
.put(NexmarkQueryName.HIGHEST_BID, new Query7Model(configuration))
.put(NexmarkQueryName.MONITOR_NEW_USERS, new Query8Model(configuration))
.put(NexmarkQueryName.WINNING_BIDS, new Query9Model(configuration))
.put(NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN, new BoundedSideInputJoinModel(configuration))
.put(NexmarkQueryName.SESSION_SIDE_INPUT_JOIN, new SessionSideInputJoinModel(configuration))
.build();
}
private Map<NexmarkQueryName, NexmarkQuery> createQueries() {
return isSql() ? createSqlQueries() : createJavaQueries();
}
private Map<NexmarkQueryName, NexmarkQuery> createSqlQueries() {
return ImmutableMap.<NexmarkQueryName, NexmarkQuery>builder()
.put(NexmarkQueryName.PASSTHROUGH, new NexmarkQuery(configuration, new SqlQuery0()))
.put(NexmarkQueryName.CURRENCY_CONVERSION, new NexmarkQuery(configuration, new SqlQuery1()))
.put(
NexmarkQueryName.SELECTION,
new NexmarkQuery(configuration, new SqlQuery2(configuration.auctionSkip)))
.put(
NexmarkQueryName.LOCAL_ITEM_SUGGESTION,
new NexmarkQuery(configuration, new SqlQuery3(configuration)))
// SqlQuery5 is disabled for now, uses non-equi-joins,
// never worked right, was giving incorrect results.
// Gets rejected after PR/8301, causing failures.
//
// See:
// https://issues.apache.org/jira/browse/BEAM-7072
// https://github.com/apache/beam/pull/8301
// https://github.com/apache/beam/pull/8422#issuecomment-487676350
//
// .put(
// NexmarkQueryName.HOT_ITEMS,
// new NexmarkQuery(configuration, new SqlQuery5(configuration)))
.put(
NexmarkQueryName.HIGHEST_BID,
new NexmarkQuery(configuration, new SqlQuery7(configuration)))
.put(
NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN,
new NexmarkQuery(configuration, new SqlBoundedSideInputJoin(configuration)))
.build();
}
private Map<NexmarkQueryName, NexmarkQuery> createJavaQueries() {
return ImmutableMap.<NexmarkQueryName, NexmarkQuery>builder()
.put(NexmarkQueryName.PASSTHROUGH, new NexmarkQuery(configuration, new Query0()))
.put(
NexmarkQueryName.CURRENCY_CONVERSION,
new NexmarkQuery(configuration, new Query1(configuration)))
.put(NexmarkQueryName.SELECTION, new NexmarkQuery(configuration, new Query2(configuration)))
.put(
NexmarkQueryName.LOCAL_ITEM_SUGGESTION,
new NexmarkQuery(configuration, new Query3(configuration)))
.put(
NexmarkQueryName.AVERAGE_PRICE_FOR_CATEGORY,
new NexmarkQuery(configuration, new Query4(configuration)))
.put(NexmarkQueryName.HOT_ITEMS, new NexmarkQuery(configuration, new Query5(configuration)))
.put(
NexmarkQueryName.AVERAGE_SELLING_PRICE_BY_SELLER,
new NexmarkQuery(configuration, new Query6(configuration)))
.put(
NexmarkQueryName.HIGHEST_BID,
new NexmarkQuery(configuration, new Query7(configuration)))
.put(
NexmarkQueryName.MONITOR_NEW_USERS,
new NexmarkQuery(configuration, new Query8(configuration)))
.put(
NexmarkQueryName.WINNING_BIDS,
new NexmarkQuery(configuration, new Query9(configuration)))
.put(
NexmarkQueryName.LOG_TO_SHARDED_FILES,
new NexmarkQuery(configuration, new Query10(configuration)))
.put(
NexmarkQueryName.USER_SESSIONS,
new NexmarkQuery(configuration, new Query11(configuration)))
.put(
NexmarkQueryName.PROCESSING_TIME_WINDOWS,
new NexmarkQuery(configuration, new Query12(configuration)))
.put(
NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN,
new NexmarkQuery(configuration, new BoundedSideInputJoin(configuration)))
.put(
NexmarkQueryName.SESSION_SIDE_INPUT_JOIN,
new NexmarkQuery(configuration, new SessionSideInputJoin(configuration)))
.build();
}
private static class PubsubMessageEventDoFn extends DoFn<PubsubMessage, Event> {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
byte[] payload = c.element().getPayload();
Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload);
c.output(event);
}
}
private static class EventPubsubMessageDoFn extends DoFn<Event, PubsubMessage> {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element());
c.output(new PubsubMessage(payload, Collections.emptyMap()));
}
}
}