blob: a76614319e337b83615acdcb3629dee5ff228a15 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
import org.joda.time.Instant;
/**
* Encapsulate interaction with time within the execution environment.
*
* <p>This class allows setting and deleting timers, and also retrieving an estimate of the current
* time.
*/
public interface TimerInternals {
/**
* Sets a timer to be fired when the current time in the specified time domain reaches the target
* timestamp.
*
* <p>The combination of {@code namespace} and {@code timerId} uniquely identify a timer.
*
* <p>If a timer is set and then set again before it fires, later settings should clear the prior
* setting.
*
* <p>It is an error to set a timer for two different time domains.
*/
void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain);
/** @deprecated use {@link #setTimer(StateNamespace, String, Instant, TimeDomain)}. */
@Deprecated
void setTimer(TimerData timerData);
/**
* Deletes the given timer.
*
* <p>A timer's ID is enforced to be unique in validation of a {@link DoFn}, but runners often
* manage timers for different time domains in very different ways, thus the {@link TimeDomain} is
* a required parameter.
*/
void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain);
/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
@Deprecated
void deleteTimer(StateNamespace namespace, String timerId);
/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
@Deprecated
void deleteTimer(TimerData timerKey);
/** Returns the current timestamp in the {@link TimeDomain#PROCESSING_TIME} time domain. */
Instant currentProcessingTime();
/**
* Returns the current timestamp in the {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} time
* domain or {@code null} if unknown.
*/
@Nullable
Instant currentSynchronizedProcessingTime();
/**
* Return the current, local input watermark timestamp for this computation in the {@link
* TimeDomain#EVENT_TIME} time domain.
*
* <p>This value:
*
* <ol>
* <li>Is never {@literal null}, but may be {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
* <li>Is monotonically increasing.
* <li>May differ between workers due to network and other delays.
* <li>Will never be ahead of the global input watermark for this computation. But it may be
* arbitrarily behind the global input watermark.
* <li>Any element with a timestamp before the local input watermark can be considered 'locally
* late' and be subject to special processing or be dropped entirely.
* </ol>
*
* <p>Note that because the local input watermark can be behind the global input watermark, it is
* possible for an element to be considered locally on-time even though it is globally late.
*/
Instant currentInputWatermarkTime();
/**
* Return the current, local output watermark timestamp for this computation in the {@link
* TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown.
*
* <p>This value:
*
* <ol>
* <li>Is monotonically increasing.
* <li>Will never be ahead of {@link #currentInputWatermarkTime} as returned above.
* <li>May differ between workers due to network and other delays.
* <li>However will never be behind the global input watermark for any following computation.
* </ol>
*
* <p>In pictures:
*
* <pre>{@code
* | | | | |
* | | D | C | B | A
* | | | | |
* GIWM <= GOWM <= LOWM <= LIWM <= GIWM
* (next stage)
* -------------------------------------------------> event time
* }</pre>
*
* <p>where
*
* <ul>
* <li>LOWM = local output water mark.
* <li>GOWM = global output water mark.
* <li>GIWM = global input water mark.
* <li>LIWM = local input water mark.
* <li>A = A globally on-time element.
* <li>B = A globally late, but locally on-time element.
* <li>C = A locally late element which may still contribute to the timestamp of a pane.
* <li>D = A locally late element which cannot contribute to the timestamp of a pane.
* </ul>
*
* <p>Note that if a computation emits an element which is not before the current output watermark
* then that element will always appear locally on-time in all following computations. However, it
* is possible for an element emitted before the current output watermark to appear locally
* on-time in a following computation. Thus we must be careful to never assume locally late data
* viewed on the output of a computation remains locally late on the input of a following
* computation.
*/
@Nullable
Instant currentOutputWatermarkTime();
/** Data about a timer as represented within {@link TimerInternals}. */
@AutoValue
abstract class TimerData implements Comparable<TimerData> {
public abstract String getTimerId();
public abstract StateNamespace getNamespace();
public abstract Instant getTimestamp();
public abstract TimeDomain getDomain();
// When adding a new field, make sure to add it to the compareTo() method.
/**
* Construct a {@link TimerData} for the given parameters, where the timer ID is automatically
* generated.
*/
public static TimerData of(
String timerId, StateNamespace namespace, Instant timestamp, TimeDomain domain) {
return new AutoValue_TimerInternals_TimerData(timerId, namespace, timestamp, domain);
}
/**
* Construct a {@link TimerData} for the given parameters, where the timer ID is
* deterministically generated from the {@code timestamp} and {@code domain}.
*/
public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
String timerId =
new StringBuilder()
.append(domain.ordinal())
.append(':')
.append(timestamp.getMillis())
.toString();
return of(timerId, namespace, timestamp, domain);
}
/**
* {@inheritDoc}.
*
* <p>Used for sorting {@link TimerData} by timestamp. Furthermore, we compare timers by all the
* other fields so that {@code compareTo()} only returns 0 when {@code equals()} returns 0. This
* ensures consistent sort order.
*/
@Override
public int compareTo(TimerData that) {
if (this.equals(that)) {
return 0;
}
ComparisonChain chain =
ComparisonChain.start()
.compare(this.getTimestamp(), that.getTimestamp())
.compare(this.getDomain(), that.getDomain())
.compare(this.getTimerId(), that.getTimerId());
if (chain.result() == 0 && !this.getNamespace().equals(that.getNamespace())) {
// Obtaining the stringKey may be expensive; only do so if required
chain = chain.compare(getNamespace().stringKey(), that.getNamespace().stringKey());
}
return chain.result();
}
}
/** A {@link Coder} for {@link TimerData}. */
class TimerDataCoder extends StructuredCoder<TimerData> {
private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
private static final InstantCoder INSTANT_CODER = InstantCoder.of();
private final Coder<? extends BoundedWindow> windowCoder;
public static TimerDataCoder of(Coder<? extends BoundedWindow> windowCoder) {
return new TimerDataCoder(windowCoder);
}
private TimerDataCoder(Coder<? extends BoundedWindow> windowCoder) {
this.windowCoder = windowCoder;
}
@Override
public void encode(TimerData timer, OutputStream outStream) throws CoderException, IOException {
STRING_CODER.encode(timer.getTimerId(), outStream);
STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
INSTANT_CODER.encode(timer.getTimestamp(), outStream);
STRING_CODER.encode(timer.getDomain().name(), outStream);
}
@Override
public TimerData decode(InputStream inStream) throws CoderException, IOException {
String timerId = STRING_CODER.decode(inStream);
StateNamespace namespace =
StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
Instant timestamp = INSTANT_CODER.decode(inStream);
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
return TimerData.of(timerId, namespace, timestamp, domain);
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return Arrays.asList(windowCoder);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(this, "window coder must be deterministic", windowCoder);
}
}
}