| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.operators.windows; |
| |
| import org.apache.samza.annotation.InterfaceStability; |
| import org.apache.samza.operators.functions.FoldLeftFunction; |
| import org.apache.samza.operators.functions.MapFunction; |
| import org.apache.samza.operators.functions.SupplierFunction; |
| import org.apache.samza.operators.triggers.TimeTrigger; |
| import org.apache.samza.operators.triggers.Trigger; |
| import org.apache.samza.operators.triggers.Triggers; |
| import org.apache.samza.operators.windows.internal.WindowInternal; |
| import org.apache.samza.operators.windows.internal.WindowType; |
| import org.apache.samza.serializers.Serde; |
| |
| import java.time.Duration; |
| import java.util.Collection; |
| |
| /** |
| * APIs for creating different types of {@link Window}s. |
| * |
| * Groups incoming messages in a {@link org.apache.samza.operators.MessageStream} into finite windows for processing. |
| * |
| * <p> Each window is uniquely identified by its {@link WindowKey}. A window can have one or more associated |
| * {@link Trigger}s that determine when results from the {@link Window} are emitted. Each emitted result contains one |
| * or more messages in the window and is called a {@link WindowPane}. |
| * |
| * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data |
| * for the window has arrived, or late triggers that allow handling late arrivals of data. |
| * <pre> |
| * window wk1 |
| * +--------------------------------+ |
| * ------------+--------+-----------+ |
| * | | | | |
| * | pane 1 |pane2 | pane3 | |
| * +-----------+--------+-----------+ |
| * |
| * ----------------------------------- |
| * incoming message stream ------+ |
| * ----------------------------------- |
| * window wk2 |
| * +---------------------+---------+ |
| * | pane 1| pane 2 | pane 3 | |
| * | | | | |
| * +---------+-----------+---------+ |
| * |
| * window wk3 |
| * +----------+-----------+---------+ |
| * | | | | |
| * | pane 1 | pane 2 | pane 3| |
| * | | | | |
| * +----------+-----------+---------+ |
| * |
| * </pre> |
| * <p> A {@link Window} can be one of the following types: |
| * <ul> |
| * <li> |
| * Tumbling Window: A tumbling window defines a series of non-overlapping, fixed size, contiguous intervals. |
| * <li> |
| * Session Window: A session window groups a {@link org.apache.samza.operators.MessageStream} into sessions. |
| * A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. |
| * The boundary for a session is defined by a {@code sessionGap}. All messages that that arrive within |
| * the gap are grouped into the same session. |
| * </ul> |
| * |
| * <p> A {@link Window} is said to be "keyed" when the incoming messages are first grouped based on their key |
| * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants |
| * of the window types above. |
| * |
| * <p> The value for the window can be updated incrementally by providing an {@code initialValue} {@link SupplierFunction} |
| * and an aggregating {@link FoldLeftFunction}. The initial value supplier is invoked every time a new window is |
| * created. The aggregating function is invoked for each incoming message for the window. If these are not provided, |
| * the emitted {@link WindowPane} will contain a collection of messages in the window. |
| * |
| * <p> Time granularity for windows: Currently, time durations are always measured in milliseconds. Time units of |
| * finer granularity are not supported. |
| */ |
| @InterfaceStability.Unstable |
| public final class Windows { |
| |
| private Windows() { } |
| |
| /** |
| * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping processing |
| * time based windows based on the provided keyFn and applies the provided fold function to them. |
| * |
| * <p>The below example computes the maximum value per-key over fixed size 10 second windows. |
| * |
| * <pre> {@code |
| * MessageStream<UserClick> stream = ...; |
| * MapFunction<UserClick, String> keyFn = ...; |
| * SupplierFunction<Integer> initialValue = () -> 0; |
| * FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c); |
| * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window( |
| * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator)); |
| * } |
| * </pre> |
| * |
| * @param keyFn the function to extract the window key from a message |
| * @param interval the duration in processing time |
| * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created. |
| * @param aggregator the function to incrementally update the window value. Invoked when a new message |
| * arrives for the window. |
| * @param keySerde the serde for the window key |
| * @param windowValueSerde the serde for the window value |
| * @param <M> the type of the input message |
| * @param <WV> the type of the {@link WindowPane} output value |
| * @param <K> the type of the key in the {@link Window} |
| * @return the created {@link Window} function. |
| */ |
| public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(MapFunction<? super M, ? extends K> keyFn, Duration interval, |
| SupplierFunction<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator, Serde<K> keySerde, |
| Serde<WV> windowValueSerde) { |
| |
| Trigger<M> defaultTrigger = new TimeTrigger<>(interval); |
| return new WindowInternal<>(defaultTrigger, (SupplierFunction<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator, |
| (MapFunction<M, K>) keyFn, null, WindowType.TUMBLING, keySerde, windowValueSerde, null); |
| } |
| |
| /** |
| * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping |
| * processing time based windows using the provided keyFn. |
| * |
| * <p>The below example groups the stream into fixed-size 10 second windows for each key. |
| * |
| * <pre> {@code |
| * MessageStream<UserClick> stream = ...; |
| * Function<UserClick, String> keyFn = ...; |
| * MessageStream<WindowPane<String, Collection<UserClick>>> windowedStream = stream.window( |
| * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10))); |
| * } |
| * </pre> |
| * |
| * @param keyFn function to extract key from the message |
| * @param interval the duration in processing time |
| * @param keySerde the serde for the window key |
| * @param msgSerde the serde for the input message |
| * @param <M> the type of the input message |
| * @param <K> the type of the key in the {@link Window} |
| * @return the created {@link Window} function |
| */ |
| public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(MapFunction<M, K> keyFn, Duration interval, |
| Serde<K> keySerde, Serde<M> msgSerde) { |
| |
| Trigger<M> defaultTrigger = new TimeTrigger<>(interval); |
| return new WindowInternal<>(defaultTrigger, null, null, keyFn, null, WindowType.TUMBLING, |
| keySerde, null, msgSerde); |
| } |
| |
| /** |
| * Creates a {@link Window} that windows values into fixed-size processing time based windows and aggregates |
| * them applying the provided function. |
| * |
| * <p>The below example computes the maximum value per-key over fixed size 10 second windows. |
| * |
| * <pre> {@code |
| * MessageStream<String> stream = ...; |
| * SupplierFunction<Integer> initialValue = () -> 0; |
| * FoldLeftFunction<String, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c); |
| * MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window( |
| * Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator)); |
| * } |
| * </pre> |
| * |
| * @param interval the duration in processing time |
| * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created. |
| * @param aggregator the function to incrementally update the window value. Invoked when a new message |
| * arrives for the window. |
| * @param windowValueSerde the serde for the window value |
| * @param <M> the type of the input message |
| * @param <WV> the type of the {@link WindowPane} output value |
| * @return the created {@link Window} function |
| */ |
| public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration interval, SupplierFunction<? extends WV> initialValue, |
| FoldLeftFunction<? super M, WV> aggregator, Serde<WV> windowValueSerde) { |
| Trigger<M> defaultTrigger = new TimeTrigger<>(interval); |
| return new WindowInternal<>(defaultTrigger, (SupplierFunction<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator, |
| null, null, WindowType.TUMBLING, null, windowValueSerde, null); |
| } |
| |
| /** |
| * |
| * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping |
| * processing time based windows. |
| * |
| * <p>The below example groups the stream into fixed-size 10 second windows and computes a windowed-percentile. |
| * |
| * <pre> {@code |
| * MessageStream<Long> stream = ...; |
| * Function<Collection<Long>, Long> percentile99 = .. |
| * |
| * MessageStream<WindowPane<Void, Collection<Long>>> windowedStream = |
| * integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10))); |
| * MessageStream<Long> windowedPercentiles = |
| * windowedStream.map(windowPane -> percentile99(windowPane.getMessage()); |
| * } |
| * </pre> |
| * |
| * @param duration the duration in processing time |
| * @param msgSerde the serde for the input message |
| * @param <M> the type of the input message |
| * @return the created {@link Window} function |
| */ |
| public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration, Serde<M> msgSerde) { |
| Trigger<M> defaultTrigger = new TimeTrigger<>(duration); |
| return new WindowInternal<>(defaultTrigger, null, null, null, null, |
| WindowType.TUMBLING, null, null, msgSerde); |
| } |
| |
| /** |
| * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided |
| * {@code sessionGap} and applies the provided fold function to them. |
| * |
| * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. |
| * A session is considered complete when no new messages arrive within the {@code sessionGap}. All messages |
| * that arrive within the gap are grouped into the same session. |
| * |
| * <p>The below example computes the maximum value per-key over a session window of gap 10 seconds. |
| * |
| * <pre> {@code |
| * MessageStream<UserClick> stream = ...; |
| * SupplierFunction<Integer> initialValue = () -> 0; |
| * FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c); |
| * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..; |
| * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window( |
| * Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator)); |
| * } |
| * </pre> |
| * |
| * @param keyFn the function to extract the window key from a message |
| * @param sessionGap the timeout gap for defining the session |
| * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created. |
| * @param aggregator the function to incrementally update the window value. Invoked when a new message |
| * arrives for the window. |
| * @param keySerde the serde for the window key |
| * @param windowValueSerde the serde for the window value |
| * @param <M> the type of the input message |
| * @param <K> the type of the key in the {@link Window} |
| * @param <WV> the type of the output value in the {@link WindowPane} |
| * @return the created {@link Window} function |
| */ |
| public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(MapFunction<? super M, ? extends K> keyFn, |
| Duration sessionGap, SupplierFunction<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator, |
| Serde<K> keySerde, Serde<WV> windowValueSerde) { |
| Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap); |
| return new WindowInternal<>(defaultTrigger, (SupplierFunction<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator, |
| (MapFunction<M, K>) keyFn, null, WindowType.SESSION, keySerde, windowValueSerde, null); |
| } |
| |
| /** |
| * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided |
| * {@code sessionGap}. |
| * |
| * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. The |
| * boundary for the session is defined by a {@code sessionGap}. All messages that that arrive within |
| * the gap are grouped into the same session. |
| * |
| * <p>The below example groups the stream into per-key session windows of gap 10 seconds. |
| * |
| * <pre> {@code |
| * MessageStream<UserClick> stream = ...; |
| * SupplierFunction<Integer> initialValue = () -> 0; |
| * FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c); |
| * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..; |
| * MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window( |
| * Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10))); |
| * } |
| * </pre> |
| * |
| * @param keyFn the function to extract the window key from a message} |
| * @param sessionGap the timeout gap for defining the session |
| * @param keySerde the serde for the window key |
| * @param msgSerde the serde for the input message |
| * @param <M> the type of the input message |
| * @param <K> the type of the key in the {@link Window} |
| * @return the created {@link Window} function |
| */ |
| public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(MapFunction<? super M, ? extends K> keyFn, |
| Duration sessionGap, Serde<K> keySerde, Serde<M> msgSerde) { |
| Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap); |
| return new WindowInternal<>(defaultTrigger, null, null, (MapFunction<M, K>) keyFn, |
| null, WindowType.SESSION, keySerde, null, msgSerde); |
| } |
| } |