blob: a766449abce1e9bc5101272e18593b72b36b21a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.triggers;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.MergingStateAccessor;
import org.apache.beam.runners.core.StateAccessor;
import org.apache.beam.runners.core.StateMerging;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.Combine.Holder;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormat;
import org.joda.time.format.PeriodFormatter;
/**
* A base class for triggers that happen after a processing time delay from the arrival of the first
* element in a pane.
*
* <p>This class is for internal use only and may change at any time.
*/
// This class should be inlined to subclasses and deleted, simplifying them too
// https://issues.apache.org/jira/browse/BEAM-1486
@Experimental(Experimental.Kind.TRIGGER)
public abstract class AfterDelayFromFirstElementStateMachine extends TriggerStateMachine {
protected static final List<SerializableFunction<Instant, Instant>> IDENTITY = ImmutableList.of();
protected static final StateTag<CombiningState<Instant, Holder<Instant>, Instant>>
DELAYED_UNTIL_TAG =
StateTags.makeSystemTagInternal(
StateTags.combiningValueFromInputInternal(
"delayed", InstantCoder.of(), Min.naturalOrder()));
private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH);
/** To complete an implementation, return the desired time from the TriggerContext. */
@Nullable
public abstract Instant getCurrentTime(TriggerStateMachine.TriggerContext context);
/**
* To complete an implementation, return a new instance like this one, but incorporating the
* provided timestamp mapping functions. Generally should be used by calling the constructor of
* this class from the constructor of the subclass.
*/
protected abstract AfterDelayFromFirstElementStateMachine newWith(
List<SerializableFunction<Instant, Instant>> transform);
/**
* A list of timestampMappers m1, m2, m3, ... m_n considered to be composed in sequence. The
* overall mapping for an instance `instance` is `m_n(... m3(m2(m1(instant))`, implemented via
* #computeTargetTimestamp
*/
protected final List<SerializableFunction<Instant, Instant>> timestampMappers;
protected final TimeDomain timeDomain;
public AfterDelayFromFirstElementStateMachine(
TimeDomain timeDomain, List<SerializableFunction<Instant, Instant>> timestampMappers) {
super(null);
this.timestampMappers = timestampMappers;
this.timeDomain = timeDomain;
}
private Instant getTargetTimestamp(OnElementContext c) {
return computeTargetTimestamp(c.currentProcessingTime());
}
/** The time domain according to which this trigger sets timers. */
public TimeDomain getTimeDomain() {
return timeDomain;
}
/**
* The mapping functions applied to the arrival time of an element to determine when to set a
* wake-up timer for triggering.
*/
public List<SerializableFunction<Instant, Instant>> getTimestampMappers() {
return timestampMappers;
}
/**
* Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
* than the timestamp.
*
* <p>TODO: Consider sharing this with FixedWindows, and bring over the equivalent of
* CalendarWindows.
*/
public AfterDelayFromFirstElementStateMachine alignedTo(
final Duration size, final Instant offset) {
return newWith(new AlignFn(size, offset));
}
/**
* Aligns the time to be the smallest multiple of {@code size} greater than the timestamp since
* the epoch.
*/
public AfterDelayFromFirstElementStateMachine alignedTo(final Duration size) {
return alignedTo(size, new Instant(0));
}
/**
* Adds some delay to the original target time.
*
* @param delay the delay to add
* @return An updated time trigger that will wait the additional time before firing.
*/
public AfterDelayFromFirstElementStateMachine plusDelayOf(final Duration delay) {
return newWith(new DelayFn(delay));
}
@Override
public boolean isCompatible(TriggerStateMachine other) {
if (!getClass().equals(other.getClass())) {
return false;
}
AfterDelayFromFirstElementStateMachine that = (AfterDelayFromFirstElementStateMachine) other;
return this.timestampMappers.equals(that.timestampMappers);
}
private AfterDelayFromFirstElementStateMachine newWith(
SerializableFunction<Instant, Instant> timestampMapper) {
return newWith(
ImmutableList.<SerializableFunction<Instant, Instant>>builder()
.addAll(timestampMappers)
.add(timestampMapper)
.build());
}
@Override
public void prefetchOnElement(StateAccessor<?> state) {
state.access(DELAYED_UNTIL_TAG).readLater();
}
@Override
public void onElement(OnElementContext c) throws Exception {
GroupingState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
Instant oldDelayUntil = delayUntilState.read();
// Since processing time can only advance, resulting in target wake-up times we would
// ignore anyhow, we don't bother with it if it is already set.
if (oldDelayUntil != null) {
return;
}
Instant targetTimestamp = getTargetTimestamp(c);
delayUntilState.add(targetTimestamp);
c.setTimer(targetTimestamp, timeDomain);
}
@Override
public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
super.prefetchOnMerge(state);
StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG);
}
@Override
public void onMerge(OnMergeContext c) throws Exception {
// NOTE: We could try to delete all timers which are still active, but we would
// need access to a timer context for each merging window.
// for (CombiningValueStateInternal<Instant, Combine.Holder<Instant>, Instant> state :
// c.state().accessInEachMergingWindow(DELAYED_UNTIL_TAG).values()) {
// Instant timestamp = state.get().read();
// if (timestamp != null) {
// <context for merging window>.deleteTimer(timestamp, timeDomain);
// }
// }
// Instead let them fire and be ignored.
// If the trigger is already finished, there is no way it will become re-activated
if (c.trigger().isFinished()) {
StateMerging.clear(c.state(), DELAYED_UNTIL_TAG);
// NOTE: We do not attempt to delete the timers.
return;
}
// Determine the earliest point across all the windows, and delay to that.
StateMerging.mergeCombiningValues(c.state(), DELAYED_UNTIL_TAG);
Instant earliestTargetTime = c.state().access(DELAYED_UNTIL_TAG).read();
if (earliestTargetTime != null) {
c.setTimer(earliestTargetTime, timeDomain);
}
}
@Override
public void prefetchShouldFire(StateAccessor<?> state) {
state.access(DELAYED_UNTIL_TAG).readLater();
}
@Override
public void clear(TriggerContext c) throws Exception {
c.state().access(DELAYED_UNTIL_TAG).clear();
}
@Override
public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exception {
Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read();
return delayedUntil != null
&& getCurrentTime(context) != null
&& getCurrentTime(context).isAfter(delayedUntil);
}
@Override
public final void onFire(TriggerContext context) throws Exception {
clear(context);
context.trigger().setFinished(true);
}
protected Instant computeTargetTimestamp(Instant time) {
Instant result = time;
for (SerializableFunction<Instant, Instant> timestampMapper : timestampMappers) {
result = timestampMapper.apply(result);
}
return result;
}
/** A {@link SerializableFunction} to delay the timestamp at which this triggers fires. */
static final class DelayFn implements SerializableFunction<Instant, Instant> {
private final Duration delay;
public DelayFn(Duration delay) {
this.delay = delay;
}
@Override
public Instant apply(Instant input) {
return input.plus(delay);
}
@Override
public boolean equals(Object object) {
if (object == this) {
return true;
}
if (!(object instanceof DelayFn)) {
return false;
}
return this.delay.equals(((DelayFn) object).delay);
}
@Override
public int hashCode() {
return Objects.hash(delay);
}
@Override
public String toString() {
return PERIOD_FORMATTER.print(delay.toPeriod());
}
}
/** A {@link SerializableFunction} to align an instant to the nearest interval boundary. */
static final class AlignFn implements SerializableFunction<Instant, Instant> {
private final Duration size;
private final Instant offset;
/**
* Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
* than the timestamp.
*/
public AlignFn(Duration size, Instant offset) {
this.size = size;
this.offset = offset;
}
@Override
public Instant apply(Instant point) {
long millisSinceStart = new Duration(offset, point).getMillis() % size.getMillis();
return millisSinceStart == 0 ? point : point.plus(size).minus(millisSinceStart);
}
@Override
public boolean equals(Object object) {
if (object == this) {
return true;
}
if (!(object instanceof AlignFn)) {
return false;
}
AlignFn other = (AlignFn) object;
return other.size.equals(this.size) && other.offset.equals(this.offset);
}
@Override
public int hashCode() {
return Objects.hash(size, offset);
}
}
}