| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.kafka; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; |
| |
| import java.io.Serializable; |
| import java.util.Optional; |
| import org.apache.beam.sdk.io.kafka.TimestampPolicy.PartitionContext; |
| import org.apache.beam.sdk.transforms.SerializableFunction; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.kafka.common.TopicPartition; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| |
| /** |
| * An extendable factory to create a {@link TimestampPolicy} for each partition at runtime by |
| * KafkaIO reader. Subclasses implement {@link #createTimestampPolicy}, which is invoked by the the |
| * reader while starting or resuming from a checkpoint. Two commonly used policies are provided. See |
| * {@link #withLogAppendTime()} and {@link #withProcessingTime()}. |
| */ |
| @FunctionalInterface |
| public interface TimestampPolicyFactory<KeyT, ValueT> extends Serializable { |
| |
| /** |
| * Creates a TimestampPolicy for a partition. This is invoked by the reader at the start or while |
| * resuming from previous checkpoint. |
| * |
| * @param tp The returned policy applies to records from this {@link TopicPartition}. |
| * @param previousWatermark The latest check-pointed watermark. This is set when the reader is |
| * resuming from a checkpoint. This is a good value to return by implementations of {@link |
| * TimestampPolicy#getWatermark(PartitionContext)} until a better watermark can be established |
| * as more records are read. |
| */ |
| TimestampPolicy<KeyT, ValueT> createTimestampPolicy( |
| TopicPartition tp, Optional<Instant> previousWatermark); |
| |
| /** |
| * A {@link TimestampPolicy} that assigns processing time to each record. Specifically, this is |
| * the timestamp when the record becomes 'current' in the reader. The watermark aways advances to |
| * current time. |
| */ |
| static <K, V> TimestampPolicyFactory<K, V> withProcessingTime() { |
| return (tp, prev) -> new ProcessingTimePolicy<>(); |
| } |
| |
| /** |
| * A {@link TimestampPolicy} that assigns Kafka's log append time (server side ingestion time) to |
| * each record. The watermark for each Kafka partition is the timestamp of the last record read. |
| * If a partition is idle, the watermark advances roughly to 'current time - 2 seconds'. See |
| * {@link KafkaIO.Read#withLogAppendTime()} for longer description. |
| */ |
| static <K, V> TimestampPolicyFactory<K, V> withLogAppendTime() { |
| return (tp, previousWatermark) -> new LogAppendTimePolicy<>(previousWatermark); |
| } |
| |
| /** |
| * {@link CustomTimestampPolicyWithLimitedDelay} using {@link KafkaTimestampType#CREATE_TIME} from |
| * the record for timestamp. See {@link KafkaIO.Read#withCreateTime(Duration)} for more complete |
| * documentation. |
| */ |
| static <K, V> TimestampPolicyFactory<K, V> withCreateTime(Duration maxDelay) { |
| SerializableFunction<KafkaRecord<K, V>, Instant> timestampFunction = |
| record -> { |
| checkArgument( |
| record.getTimestampType() == KafkaTimestampType.CREATE_TIME, |
| "Kafka record's timestamp is not 'CREATE_TIME' " |
| + "(topic: %s, partition %s, offset %s, timestamp type '%s')", |
| record.getTopic(), |
| record.getPartition(), |
| record.getOffset(), |
| record.getTimestampType()); |
| return new Instant(record.getTimestamp()); |
| }; |
| |
| return (tp, previousWatermark) -> |
| new CustomTimestampPolicyWithLimitedDelay<>(timestampFunction, maxDelay, previousWatermark); |
| } |
| |
| /** |
| * Used by the Read transform to support old timestamp functions API. This exists only to support |
| * other deprecated API {@link KafkaIO.Read#withTimestampFn(SerializableFunction)}.<br> |
| * TODO(rangadi): Make this package private or remove it. It was never meant to be public. |
| * |
| * @deprecated Use @{@link CustomTimestampPolicyWithLimitedDelay}. |
| */ |
| @Deprecated |
| static <K, V> TimestampPolicyFactory<K, V> withTimestampFn( |
| final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) { |
| return (tp, previousWatermark) -> new TimestampFnPolicy<>(timestampFn, previousWatermark); |
| } |
| |
| /** |
| * A simple policy that uses current time for event time and watermark. This should be used when |
| * better timestamps like LogAppendTime are not available for a topic. |
| */ |
| class ProcessingTimePolicy<K, V> extends TimestampPolicy<K, V> { |
| |
| @Override |
| public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<K, V> record) { |
| return Instant.now(); |
| } |
| |
| @Override |
| public Instant getWatermark(PartitionContext context) { |
| return Instant.now(); |
| } |
| } |
| |
| /** |
| * Assigns Kafka's log append time (server side ingestion time) to each record. The watermark for |
| * each Kafka partition is the timestamp of the last record read. If a partition is idle, the |
| * watermark advances roughly to 'current time - 2 seconds'. See {@link |
| * KafkaIO.Read#withLogAppendTime()} for longer description. |
| */ |
| class LogAppendTimePolicy<K, V> extends TimestampPolicy<K, V> { |
| |
| /** |
| * When a partition is idle or caught up (i.e. backlog is zero), we advance the watermark to |
| * near realtime. Kafka server does not have an API to provide server side current timestamp |
| * which could ensure minimum LogAppendTime for future records. The best we could do is to |
| * advance the watermark to 'last backlog check time - small delta to account for any internal |
| * buffering in Kafka'. Using 2 seconds for this delta. Should this be user configurable? |
| */ |
| private static final Duration IDLE_WATERMARK_DELTA = Duration.standardSeconds(2); |
| |
| protected Instant currentWatermark; |
| |
| public LogAppendTimePolicy(Optional<Instant> previousWatermark) { |
| currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); |
| } |
| |
| @Override |
| public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<K, V> record) { |
| if (record.getTimestampType().equals(KafkaTimestampType.LOG_APPEND_TIME)) { |
| currentWatermark = new Instant(record.getTimestamp()); |
| } else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) { |
| // This is the first record and it does not have LOG_APPEND_TIME. |
| // Most likely the topic is not configured correctly. |
| throw new IllegalStateException( |
| String.format( |
| "LogAppendTimePolicy policy is enabled in reader, but Kafka record's timestamp type " |
| + "is LogAppendTime. Most likely it is not enabled on Kafka for the topic '%s'. " |
| + "Actual timestamp type is '%s'.", |
| record.getTopic(), record.getTimestampType())); |
| } |
| return currentWatermark; |
| } |
| |
| @Override |
| public Instant getWatermark(PartitionContext context) { |
| if (context.getMessageBacklog() == 0) { |
| // The reader is caught up. May need to advance the watermark. |
| Instant idleWatermark = context.getBacklogCheckTime().minus(IDLE_WATERMARK_DELTA); |
| if (idleWatermark.isAfter(currentWatermark)) { |
| currentWatermark = idleWatermark; |
| } |
| } // else, there is backlog (or is unknown). Do not advance the watermark. |
| return currentWatermark; |
| } |
| } |
| |
| /** |
| * Internal policy to support deprecated withTimestampFn API. It returns last record timestamp for |
| * watermark!. |
| */ |
| class TimestampFnPolicy<K, V> extends TimestampPolicy<K, V> { |
| |
| final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn; |
| Instant lastRecordTimestamp; |
| |
| TimestampFnPolicy( |
| SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn, |
| Optional<Instant> previousWatermark) { |
| this.timestampFn = timestampFn; |
| lastRecordTimestamp = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); |
| } |
| |
| @Override |
| public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<K, V> record) { |
| lastRecordTimestamp = timestampFn.apply(record); |
| return lastRecordTimestamp; |
| } |
| |
| @Override |
| public Instant getWatermark(PartitionContext context) { |
| return lastRecordTimestamp; |
| } |
| } |
| } |