blob: ea7620b99055c98e826c727e2efe4fe6794c62f7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;
import java.util.Optional;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* A policy for custom record timestamps where timestamps within a partition are expected to be
* roughly monotonically increasing with a cap on out of order event delays (say 1 minute). The
* watermark at any time is '({@code Min(now(), Max(event timestamp so far)) - max delay})'.
* However, watermark is never set in future and capped to 'now - max delay'. In addition, watermark
* advanced to 'now - max delay' when a partition is idle.
*/
public class CustomTimestampPolicyWithLimitedDelay<K, V> extends TimestampPolicy<K, V> {
private final Duration maxDelay;
private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction;
private Instant maxEventTimestamp;
/**
* A policy for custom record timestamps where timestamps are expected to be roughly monotonically
* increasing with out of order event delays less than {@code maxDelay}. The watermark at any time
* is {@code Min(now(), max_event_timestamp) - maxDelay}.
*
* @param timestampFunction A function to extract timestamp from the record
* @param maxDelay For any record in the Kafka partition, the timestamp of any subsequent record
* is expected to be after {@code current record timestamp - maxDelay}.
* @param previousWatermark Latest check-pointed watermark, see {@link
* TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)}
*/
public CustomTimestampPolicyWithLimitedDelay(
SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction,
Duration maxDelay,
Optional<Instant> previousWatermark) {
this.maxDelay = maxDelay;
this.timestampFunction = timestampFunction;
// 'previousWatermark' is not the same as maxEventTimestamp (e.g. it could have been in future).
// Initialize it such that watermark before reading any event same as previousWatermark.
maxEventTimestamp = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE).plus(maxDelay);
}
@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K, V> record) {
Instant ts = timestampFunction.apply(record);
if (ts.isAfter(maxEventTimestamp)) {
maxEventTimestamp = ts;
}
return ts;
}
@Override
public Instant getWatermark(PartitionContext ctx) {
// Watermark == maxEventTime - maxDelay, except in two special cases:
// a) maxEventTime in future : probably due to incorrect timestamps. Cap it to 'now'.
// b) partition is idle : Need to advance watermark if there are no records in the partition.
// We assume that future records will have timestamp >= 'now - maxDelay' and advance
// the watermark accordingly.
// The above handles majority of common use cases for custom timestamps. Users can implement
// their own policy if this does not work.
Instant now = Instant.now();
return getWatermark(ctx, now);
}
@VisibleForTesting
Instant getWatermark(PartitionContext ctx, Instant now) {
if (maxEventTimestamp.isAfter(now)) {
return now.minus(maxDelay); // (a) above.
} else if (ctx.getMessageBacklog() == 0
&& ctx.getBacklogCheckTime().minus(maxDelay).isAfter(maxEventTimestamp) // Idle
&& maxEventTimestamp.getMillis() > 0) { // Read at least one record with positive timestamp.
return ctx.getBacklogCheckTime().minus(maxDelay);
} else {
return maxEventTimestamp.minus(maxDelay);
}
}
}