blob: 97fb7b7d813e9a6f8577a2e9434d6398c6cef7fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators.triggers;
import org.apache.samza.annotation.InterfaceStability;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* API for creating {@link Trigger} instances to be used with a {@link org.apache.samza.operators.windows.Window}.
*
* <p> The below example groups an input into tumbling windows of 10s and emits early results periodically every 4s in
* processing time, or for every 50 messages. It also specifies that window results are accumulating.
*
* <pre> {@code
* MessageStream<> windowedStream = stream.window(Windows.tumblingWindow(Duration.of(10, TimeUnit.SECONDS))
* .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.of(4, TimeUnit.SECONDS))))))
* .setAccumulationMode(AccumulationMode.ACCUMULATING));
* }</pre>
*
*/
@InterfaceStability.Unstable
public final class Triggers {
private Triggers() { }
/**
* Creates a {@link Trigger} that fires when the number of messages in the pane
* reaches the specified count.
*
* @param count the number of messages to fire the trigger after
* @param <M> the type of input message in the window
* @return the created trigger
*/
public static <M> Trigger<M> count(long count) {
return new CountTrigger<M>(count);
}
/**
* Creates a trigger that fires after the specified duration has passed since the first message in
* the pane.
*
* @param duration the duration since the first element
* @param <M> the type of input message in the window
* @return the created trigger
*/
public static <M> Trigger<M> timeSinceFirstMessage(Duration duration) {
return new TimeSinceFirstMessageTrigger<M>(duration);
}
/**
* Creates a trigger that fires when there is no new message for the specified duration in the pane.
*
* @param duration the duration since the last element
* @param <M> the type of input message in the window
* @return the created trigger
*/
public static <M> Trigger<M> timeSinceLastMessage(Duration duration) {
return new TimeSinceLastMessageTrigger<M>(duration);
}
/**
* Creates a trigger that fires when any of the provided triggers fire.
*
* @param triggers the individual triggers
* @param <M> the type of input message in the window
* @return the created trigger
*/
public static <M> Trigger<M> any(Trigger<M>... triggers) {
List<Trigger<M>> triggerList = new ArrayList<>();
for (Trigger trigger : triggers) {
triggerList.add(trigger);
}
return new AnyTrigger<M>(Collections.unmodifiableList(triggerList));
}
/**
* Repeats the provided trigger forever.
*
* <p>Creating a {@link RepeatingTrigger} from an {@link AnyTrigger} is equivalent to creating an {@link AnyTrigger} from
* its individual {@link RepeatingTrigger}s.
*
* @param trigger the individual trigger to repeat
* @param <M> the type of input message in the window
* @return the created trigger
*/
public static <M> Trigger<M> repeat(Trigger<M> trigger) {
return new RepeatingTrigger<>(trigger);
}
}