| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.nexmark.queries; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.Serializable; |
| import java.nio.channels.Channels; |
| import java.nio.channels.WritableByteChannel; |
| import java.nio.charset.StandardCharsets; |
| import java.util.concurrent.ThreadLocalRandom; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; |
| import org.apache.beam.sdk.metrics.Counter; |
| import org.apache.beam.sdk.metrics.Metrics; |
| import org.apache.beam.sdk.nexmark.NexmarkConfiguration; |
| import org.apache.beam.sdk.nexmark.model.Done; |
| import org.apache.beam.sdk.nexmark.model.Event; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.GroupByKey; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.windowing.AfterEach; |
| import org.apache.beam.sdk.transforms.windowing.AfterFirst; |
| import org.apache.beam.sdk.transforms.windowing.AfterPane; |
| import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; |
| import org.apache.beam.sdk.transforms.windowing.AfterWatermark; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.FixedWindows; |
| import org.apache.beam.sdk.transforms.windowing.PaneInfo; |
| import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; |
| import org.apache.beam.sdk.transforms.windowing.Repeatedly; |
| import org.apache.beam.sdk.transforms.windowing.Window; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Query "10", 'Log to sharded files' (Not in original suite.) |
| * |
| * <p>Every windowSizeSec, save all events from the last period into 2*maxWorkers log files. |
| */ |
| public class Query10 extends NexmarkQueryTransform<Done> { |
| private static final Logger LOG = LoggerFactory.getLogger(Query10.class); |
| private static final int NUM_SHARDS_PER_WORKER = 5; |
| private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10); |
| private final NexmarkConfiguration configuration; |
| |
| /** Capture everything we need to know about the records in a single output file. */ |
| private static class OutputFile implements Serializable { |
| /** Maximum possible timestamp of records in file. */ |
| private final Instant maxTimestamp; |
| /** Shard within window. */ |
| private final String shard; |
| /** Index of file in all files in shard. */ |
| private final long index; |
| /** Timing of records in this file. */ |
| private final PaneInfo.Timing timing; |
| /** Path to file containing records, or {@literal null} if no output required. */ |
| @Nullable private final String filename; |
| |
| public OutputFile( |
| Instant maxTimestamp, |
| String shard, |
| long index, |
| PaneInfo.Timing timing, |
| @Nullable String filename) { |
| this.maxTimestamp = maxTimestamp; |
| this.shard = shard; |
| this.index = index; |
| this.timing = timing; |
| this.filename = filename; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("%s %s %d %s %s%n", maxTimestamp, shard, index, timing, filename); |
| } |
| } |
| |
| /** GCS uri prefix for all log and 'finished' files. If null they won't be written. */ |
| @Nullable private String outputPath; |
| |
| /** Maximum number of workers, used to determine log sharding factor. */ |
| private int maxNumWorkers; |
| |
| public Query10(NexmarkConfiguration configuration) { |
| super("Query10"); |
| this.configuration = configuration; |
| } |
| |
| public void setOutputPath(@Nullable String outputPath) { |
| this.outputPath = outputPath; |
| } |
| |
| public void setMaxNumWorkers(int maxNumWorkers) { |
| this.maxNumWorkers = maxNumWorkers; |
| } |
| |
| /** Return channel for writing bytes to GCS. */ |
| private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename) |
| throws IOException { |
| // TODO |
| // Fix after PR: right now this is a specific Google added use case |
| // Discuss it on ML: shall we keep GCS or use HDFS or use a generic beam filesystem way. |
| throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory"); |
| } |
| |
| /** Return a short string to describe {@code timing}. */ |
| private String timingToString(PaneInfo.Timing timing) { |
| switch (timing) { |
| case EARLY: |
| return "E"; |
| case ON_TIME: |
| return "O"; |
| case LATE: |
| return "L"; |
| case UNKNOWN: |
| return "U"; |
| } |
| throw new RuntimeException(); // cases are exhaustive |
| } |
| |
| /** Construct an {@link OutputFile} for {@code pane} in {@code window} for {@code shard}. */ |
| private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) { |
| @Nullable |
| String filename = |
| outputPath == null |
| ? null |
| : String.format( |
| "%s/LOG-%s-%s-%03d-%s-%x", |
| outputPath, |
| window.maxTimestamp(), |
| shard, |
| pane.getIndex(), |
| timingToString(pane.getTiming()), |
| ThreadLocalRandom.current().nextLong()); |
| return new OutputFile( |
| window.maxTimestamp(), shard, pane.getIndex(), pane.getTiming(), filename); |
| } |
| |
| /** |
| * Return path to which we should write the index for {@code window}, or {@literal null} if no |
| * output required. |
| */ |
| @Nullable |
| private String indexPathFor(BoundedWindow window) { |
| if (outputPath == null) { |
| return null; |
| } |
| return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp()); |
| } |
| |
| @Override |
| public PCollection<Done> expand(PCollection<Event> events) { |
| final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER; |
| |
| return events |
| .apply( |
| name + ".ShardEvents", |
| ParDo.of( |
| new DoFn<Event, KV<String, Event>>() { |
| private final Counter lateCounter = Metrics.counter(name, "actuallyLateEvent"); |
| private final Counter onTimeCounter = Metrics.counter(name, "onTimeCounter"); |
| |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| if (c.element().hasAnnotation("LATE")) { |
| lateCounter.inc(); |
| LOG.info("Observed late: %s", c.element()); |
| } else { |
| onTimeCounter.inc(); |
| } |
| int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards); |
| String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards); |
| c.output(KV.of(shard, c.element())); |
| } |
| })) |
| .apply( |
| name + ".WindowEvents", |
| Window.<KV<String, Event>>into( |
| FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) |
| .triggering( |
| AfterEach.inOrder( |
| Repeatedly.forever( |
| AfterPane.elementCountAtLeast(configuration.maxLogEvents)) |
| .orFinally(AfterWatermark.pastEndOfWindow()), |
| Repeatedly.forever( |
| AfterFirst.of( |
| AfterPane.elementCountAtLeast(configuration.maxLogEvents), |
| AfterProcessingTime.pastFirstElementInPane() |
| .plusDelayOf(LATE_BATCHING_PERIOD))))) |
| .discardingFiredPanes() |
| // Use a 1 day allowed lateness so that any forgotten hold will stall the |
| // pipeline for that period and be very noticeable. |
| .withAllowedLateness(Duration.standardDays(1))) |
| .apply(name + ".GroupByKey", GroupByKey.create()) |
| .apply( |
| name + ".CheckForLateEvents", |
| ParDo.of( |
| new DoFn<KV<String, Iterable<Event>>, KV<String, Iterable<Event>>>() { |
| private final Counter earlyCounter = Metrics.counter(name, "earlyShard"); |
| private final Counter onTimeCounter = Metrics.counter(name, "onTimeShard"); |
| private final Counter lateCounter = Metrics.counter(name, "lateShard"); |
| private final Counter unexpectedLatePaneCounter = |
| Metrics.counter(name, "ERROR_unexpectedLatePane"); |
| private final Counter unexpectedOnTimeElementCounter = |
| Metrics.counter(name, "ERROR_unexpectedOnTimeElement"); |
| |
| @ProcessElement |
| public void processElement(ProcessContext c, BoundedWindow window) { |
| int numLate = 0; |
| int numOnTime = 0; |
| for (Event event : c.element().getValue()) { |
| if (event.hasAnnotation("LATE")) { |
| numLate++; |
| } else { |
| numOnTime++; |
| } |
| } |
| String shard = c.element().getKey(); |
| LOG.info( |
| String.format( |
| "%s with timestamp %s has %d actually late and %d on-time " |
| + "elements in pane %s for window %s", |
| shard, |
| c.timestamp(), |
| numLate, |
| numOnTime, |
| c.pane(), |
| window.maxTimestamp())); |
| if (c.pane().getTiming() == PaneInfo.Timing.LATE) { |
| if (numLate == 0) { |
| LOG.error("ERROR! No late events in late pane for %s", shard); |
| unexpectedLatePaneCounter.inc(); |
| } |
| if (numOnTime > 0) { |
| LOG.error( |
| "ERROR! Have %d on-time events in late pane for %s", numOnTime, shard); |
| unexpectedOnTimeElementCounter.inc(); |
| } |
| lateCounter.inc(); |
| } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) { |
| if (numOnTime + numLate < configuration.maxLogEvents) { |
| LOG.error( |
| "ERROR! Only have %d events in early pane for %s", |
| numOnTime + numLate, shard); |
| } |
| earlyCounter.inc(); |
| } else { |
| onTimeCounter.inc(); |
| } |
| c.output(c.element()); |
| } |
| })) |
| .apply( |
| name + ".UploadEvents", |
| ParDo.of( |
| new DoFn<KV<String, Iterable<Event>>, KV<Void, OutputFile>>() { |
| private final Counter savedFileCounter = Metrics.counter(name, "savedFile"); |
| private final Counter writtenRecordsCounter = |
| Metrics.counter(name, "writtenRecords"); |
| |
| @ProcessElement |
| public void processElement(ProcessContext c, BoundedWindow window) |
| throws IOException { |
| String shard = c.element().getKey(); |
| GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); |
| OutputFile outputFile = outputFileFor(window, shard, c.pane()); |
| LOG.info( |
| String.format( |
| "Writing %s with record timestamp %s, window timestamp %s, pane %s", |
| shard, c.timestamp(), window.maxTimestamp(), c.pane())); |
| if (outputFile.filename != null) { |
| LOG.info("Beginning write to '%s'", outputFile.filename); |
| int n = 0; |
| try (OutputStream output = |
| Channels.newOutputStream( |
| openWritableGcsFile(options, outputFile.filename))) { |
| for (Event event : c.element().getValue()) { |
| Event.CODER.encode(event, output, Coder.Context.OUTER); |
| writtenRecordsCounter.inc(); |
| if (++n % 10000 == 0) { |
| LOG.info("So far written %d records to '%s'", n, outputFile.filename); |
| } |
| } |
| } |
| LOG.info("Written all %d records to '%s'", n, outputFile.filename); |
| } |
| savedFileCounter.inc(); |
| c.output(KV.of(null, outputFile)); |
| } |
| })) |
| // Clear fancy triggering from above. |
| .apply( |
| name + ".WindowLogFiles", |
| Window.<KV<Void, OutputFile>>into( |
| FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))) |
| .triggering(AfterWatermark.pastEndOfWindow()) |
| // We expect no late data here, but we'll assume the worst so we can detect any. |
| .withAllowedLateness(Duration.standardDays(1)) |
| .discardingFiredPanes()) |
| // this GroupByKey allows to have one file per window |
| .apply(name + ".GroupByKey2", GroupByKey.create()) |
| .apply( |
| name + ".Index", |
| ParDo.of( |
| new DoFn<KV<Void, Iterable<OutputFile>>, Done>() { |
| private final Counter unexpectedLateCounter = |
| Metrics.counter(name, "ERROR_unexpectedLate"); |
| private final Counter unexpectedEarlyCounter = |
| Metrics.counter(name, "ERROR_unexpectedEarly"); |
| private final Counter unexpectedIndexCounter = |
| Metrics.counter(name, "ERROR_unexpectedIndex"); |
| private final Counter finalizedCounter = Metrics.counter(name, "indexed"); |
| |
| @ProcessElement |
| public void processElement(ProcessContext c, BoundedWindow window) |
| throws IOException { |
| if (c.pane().getTiming() == Timing.LATE) { |
| unexpectedLateCounter.inc(); |
| LOG.error("ERROR! Unexpected LATE pane: %s", c.pane()); |
| } else if (c.pane().getTiming() == Timing.EARLY) { |
| unexpectedEarlyCounter.inc(); |
| LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane()); |
| } else if (c.pane().getTiming() == Timing.ON_TIME && c.pane().getIndex() != 0) { |
| unexpectedIndexCounter.inc(); |
| LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane()); |
| } else { |
| GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); |
| LOG.info( |
| "Index with record timestamp %s, window timestamp %s, pane %s", |
| c.timestamp(), window.maxTimestamp(), c.pane()); |
| |
| @Nullable String filename = indexPathFor(window); |
| if (filename != null) { |
| LOG.info("Beginning write to '%s'", filename); |
| int n = 0; |
| try (OutputStream output = |
| Channels.newOutputStream(openWritableGcsFile(options, filename))) { |
| for (OutputFile outputFile : c.element().getValue()) { |
| output.write(outputFile.toString().getBytes(StandardCharsets.UTF_8)); |
| n++; |
| } |
| } |
| LOG.info("Written all %d lines to '%s'", n, filename); |
| } |
| c.output(new Done("written for timestamp " + window.maxTimestamp())); |
| finalizedCounter.inc(); |
| } |
| } |
| })); |
| } |
| } |