| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.nexmark.sources; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.NoSuchElementException; |
| import java.util.PriorityQueue; |
| import java.util.Queue; |
| import java.util.concurrent.ThreadLocalRandom; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.io.UnboundedSource; |
| import org.apache.beam.sdk.nexmark.NexmarkUtils; |
| import org.apache.beam.sdk.nexmark.model.Event; |
| import org.apache.beam.sdk.nexmark.sources.generator.Generator; |
| import org.apache.beam.sdk.nexmark.sources.generator.GeneratorCheckpoint; |
| import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.values.TimestampedValue; |
| import org.checkerframework.checker.nullness.qual.Nullable; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A custom, unbounded source of event records. |
| * |
| * <p>If {@code isRateLimited} is true, events become available for return from the reader such that |
| * the overall rate respect the {@code interEventDelayUs} period if possible. Otherwise, events are |
| * returned every time the system asks for one. |
| */ |
| @SuppressWarnings({ |
| "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) |
| }) |
| public class UnboundedEventSource extends UnboundedSource<Event, GeneratorCheckpoint> { |
| private static final Duration BACKLOG_PERIOD = Duration.standardSeconds(30); |
| private static final Logger LOG = LoggerFactory.getLogger(UnboundedEventSource.class); |
| |
| /** Configuration for generator to use when reading synthetic events. May be split. */ |
| private final GeneratorConfig config; |
| |
| /** How many unbounded sources to create. */ |
| private final int numEventGenerators; |
| |
| /** How many seconds to hold back the watermark. */ |
| private final long watermarkHoldbackSec; |
| |
| /** Are we rate limiting the events? */ |
| private final boolean isRateLimited; |
| |
| public UnboundedEventSource( |
| GeneratorConfig config, |
| int numEventGenerators, |
| long watermarkHoldbackSec, |
| boolean isRateLimited) { |
| this.config = config; |
| this.numEventGenerators = numEventGenerators; |
| this.watermarkHoldbackSec = watermarkHoldbackSec; |
| this.isRateLimited = isRateLimited; |
| } |
| |
| /** A reader to pull events from the generator. */ |
| private class EventReader extends UnboundedReader<Event> { |
| /** Generator we are reading from. */ |
| private final Generator generator; |
| |
| /** |
| * Current watermark (ms since epoch). Initially set to beginning of time. Then updated to be |
| * the time of the next generated event. Then, once all events have been generated, set to the |
| * end of time. |
| */ |
| private long watermark; |
| |
| /** |
| * Current backlog (ms), as delay between timestamp of last returned event and the timestamp we |
| * should be up to according to wall-clock time. Used only for logging. |
| */ |
| private long backlogDurationMs; |
| |
| /** |
| * Current backlog, as estimated number of event bytes we are behind, or null if unknown. |
| * Reported to callers. |
| */ |
| private long backlogBytes; |
| |
| /** Wallclock time (ms since epoch) we last reported the backlog, or -1 if never reported. */ |
| private long lastReportedBacklogWallclock; |
| |
| /** |
| * Event time (ms since epoch) of pending event at last reported backlog, or -1 if never |
| * calculated. |
| */ |
| private long timestampAtLastReportedBacklogMs; |
| |
| /** Next event to make 'current' when wallclock time has advanced sufficiently. */ |
| private @Nullable TimestampedValue<Event> pendingEvent; |
| |
| /** Wallclock time when {@link #pendingEvent} is due, or -1 if no pending event. */ |
| private long pendingEventWallclockTime; |
| |
| /** Current event to return from getCurrent. */ |
| private @Nullable TimestampedValue<Event> currentEvent; |
| |
| /** Events which have been held back so as to force them to be late. */ |
| private final Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>(); |
| |
| public EventReader(Generator generator) { |
| this.generator = generator; |
| watermark = NexmarkUtils.BEGINNING_OF_TIME.getMillis(); |
| lastReportedBacklogWallclock = -1; |
| pendingEventWallclockTime = -1; |
| timestampAtLastReportedBacklogMs = -1; |
| updateBacklog(System.currentTimeMillis(), 0); |
| } |
| |
| public EventReader(GeneratorConfig config) { |
| this(new Generator(config)); |
| } |
| |
| @Override |
| public boolean start() { |
| LOG.trace("starting unbounded generator {}", generator); |
| return advance(); |
| } |
| |
| @Override |
| public boolean advance() { |
| long now = System.currentTimeMillis(); |
| |
| while (pendingEvent == null) { |
| if (!generator.hasNext() && heldBackEvents.isEmpty()) { |
| // No more events, EVER. |
| updateBacklog(System.currentTimeMillis(), 0); |
| if (watermark < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { |
| watermark = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); |
| LOG.trace("stopped unbounded generator {}", generator); |
| } |
| return false; |
| } |
| |
| Generator.NextEvent next = heldBackEvents.peek(); |
| if (next != null && next.wallclockTimestamp <= now) { |
| // Time to use the held-back event. |
| heldBackEvents.poll(); |
| LOG.debug( |
| "replaying held-back event {}ms behind watermark", watermark - next.eventTimestamp); |
| } else if (generator.hasNext()) { |
| next = generator.nextEvent(); |
| if (isRateLimited |
| && config.getProbDelayedEvent() > 0.0 |
| && config.getOccasionalDelaySec() > 0 |
| && ThreadLocalRandom.current().nextDouble() < config.getProbDelayedEvent()) { |
| // We'll hold back this event and go around again. |
| long delayMs = |
| ThreadLocalRandom.current().nextLong(config.getOccasionalDelaySec() * 1000) + 1L; |
| LOG.debug("delaying event by {}ms", delayMs); |
| heldBackEvents.add(next.withDelay(delayMs)); |
| continue; |
| } |
| } else { |
| // Waiting for held-back event to fire. |
| updateBacklog(now, 0); |
| return false; |
| } |
| |
| pendingEventWallclockTime = next.wallclockTimestamp; |
| pendingEvent = TimestampedValue.of(next.event, new Instant(next.eventTimestamp)); |
| long newWatermark = |
| next.watermark - Duration.standardSeconds(watermarkHoldbackSec).getMillis(); |
| if (newWatermark > watermark) { |
| watermark = newWatermark; |
| } |
| } |
| |
| if (isRateLimited) { |
| if (pendingEventWallclockTime > now) { |
| // We want this event to fire in the future. Try again later. |
| updateBacklog(now, 0); |
| return false; |
| } |
| updateBacklog(now, now - pendingEventWallclockTime); |
| } else { |
| updateBacklog(now, 0); |
| } |
| |
| // This event is ready to fire. |
| currentEvent = pendingEvent; |
| pendingEvent = null; |
| return true; |
| } |
| |
| private void updateBacklog(long now, long newBacklogDurationMs) { |
| backlogDurationMs = newBacklogDurationMs; |
| long interEventDelayUs = generator.currentInterEventDelayUs(); |
| if (isRateLimited && interEventDelayUs > 0) { |
| long backlogEvents = (backlogDurationMs * 1000 + interEventDelayUs - 1) / interEventDelayUs; |
| backlogBytes = generator.getCurrentConfig().estimatedBytesForEvents(backlogEvents); |
| } else { |
| double fractionRemaining = 1.0 - generator.getFractionConsumed(); |
| backlogBytes = |
| Math.max( |
| 0L, |
| (long) (generator.getCurrentConfig().getEstimatedSizeBytes() * fractionRemaining)); |
| } |
| if (lastReportedBacklogWallclock < 0 |
| || now - lastReportedBacklogWallclock > BACKLOG_PERIOD.getMillis()) { |
| double timeDilation = Double.NaN; |
| if (pendingEvent != null |
| && lastReportedBacklogWallclock >= 0 |
| && timestampAtLastReportedBacklogMs >= 0) { |
| long wallclockProgressionMs = now - lastReportedBacklogWallclock; |
| long eventTimeProgressionMs = |
| pendingEvent.getTimestamp().getMillis() - timestampAtLastReportedBacklogMs; |
| timeDilation = (double) eventTimeProgressionMs / (double) wallclockProgressionMs; |
| } |
| LOG.debug( |
| "unbounded generator backlog now {}ms ({} bytes) at {}us interEventDelay " |
| + "with {} time dilation", |
| backlogDurationMs, |
| backlogBytes, |
| interEventDelayUs, |
| timeDilation); |
| lastReportedBacklogWallclock = now; |
| if (pendingEvent != null) { |
| timestampAtLastReportedBacklogMs = pendingEvent.getTimestamp().getMillis(); |
| } |
| } |
| } |
| |
| @Override |
| public Event getCurrent() { |
| if (currentEvent == null) { |
| throw new NoSuchElementException(); |
| } |
| return currentEvent.getValue(); |
| } |
| |
| @Override |
| public Instant getCurrentTimestamp() { |
| if (currentEvent == null) { |
| throw new NoSuchElementException(); |
| } |
| return currentEvent.getTimestamp(); |
| } |
| |
| @Override |
| public void close() { |
| // Nothing to close. |
| } |
| |
| @Override |
| public UnboundedEventSource getCurrentSource() { |
| return UnboundedEventSource.this; |
| } |
| |
| @Override |
| public Instant getWatermark() { |
| return new Instant(watermark); |
| } |
| |
| @Override |
| public GeneratorCheckpoint getCheckpointMark() { |
| return generator.toCheckpoint(); |
| } |
| |
| @Override |
| public long getSplitBacklogBytes() { |
| return backlogBytes; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format( |
| "EventReader(%d, %d, %d)", |
| generator.getCurrentConfig().getStartEventId(), |
| generator.getNextEventId(), |
| generator.getCurrentConfig().getStopEventId()); |
| } |
| } |
| |
| @Override |
| public Coder<GeneratorCheckpoint> getCheckpointMarkCoder() { |
| return GeneratorCheckpoint.CODER_INSTANCE; |
| } |
| |
| @Override |
| public List<UnboundedEventSource> split(int desiredNumSplits, PipelineOptions options) { |
| LOG.trace("splitting unbounded source into {} sub-sources", numEventGenerators); |
| List<UnboundedEventSource> results = new ArrayList<>(); |
| // Ignore desiredNumSplits and use numEventGenerators instead. |
| for (GeneratorConfig subConfig : config.split(numEventGenerators)) { |
| results.add(new UnboundedEventSource(subConfig, 1, watermarkHoldbackSec, isRateLimited)); |
| } |
| return results; |
| } |
| |
| @Override |
| public EventReader createReader( |
| PipelineOptions options, @Nullable GeneratorCheckpoint checkpoint) { |
| if (checkpoint == null) { |
| LOG.trace("creating initial unbounded reader for {}", config); |
| return new EventReader(config); |
| } else { |
| LOG.trace("resuming unbounded reader from {}", checkpoint); |
| return new EventReader(checkpoint.toGenerator(config)); |
| } |
| } |
| |
| @Override |
| public void validate() { |
| // Nothing to validate. |
| } |
| |
| @Override |
| public Coder<Event> getDefaultOutputCoder() { |
| return Event.CODER; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format( |
| "UnboundedEventSource(%d, %d)", config.getStartEventId(), config.getStopEventId()); |
| } |
| } |