blob: 266577f5188c6e0476dfdc0accce843e9958efce [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.window;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.joda.time.Duration;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
/**
* This class describes how triggers should be fired for each window.
* For each window, a trigger can be fired before the watermark (EARLY), at the watermark (ON_TIME), or after the watermark (LATE).
* If a LATE trigger is specified and the accumulation mode is ACCUMULATING, it is important for the WindowOption to
* specify the allowed lateness because otherwise, all states must be kept in storage.
*
*
* @since 3.5.0
*/
@InterfaceStability.Evolving
public class TriggerOption
{
public enum AccumulationMode
{
DISCARDING,
ACCUMULATING,
ACCUMULATING_AND_RETRACTING
}
private AccumulationMode accumulationMode = AccumulationMode.DISCARDING;
private boolean firingOnlyUpdatedPanes = false;
/**
* Whether the trigger should be fired before the watermark, at the watermark, or after the watermark
*/
public enum Type
{
EARLY,
ON_TIME,
LATE
}
/**
* This class represents the individual trigger spec.
*/
public static class Trigger
{
protected Type type;
private Trigger()
{
// for kryo
}
Trigger(Type type)
{
this.type = type;
}
public Type getType()
{
return type;
}
}
/**
* This class represents a trigger spec in which triggers are fired at regular time intervals.
*/
public static class TimeTrigger extends Trigger
{
@FieldSerializer.Bind(JavaSerializer.class)
Duration duration;
private TimeTrigger()
{
// for kryo
}
public TimeTrigger(Type type, Duration duration)
{
super(type);
this.duration = duration;
}
public Duration getDuration()
{
return duration;
}
}
/**
* This class represents a trigger spec in which triggers are fired every n tuples
*/
public static class CountTrigger extends Trigger
{
private long count;
private CountTrigger()
{
//for kryo
}
public CountTrigger(Type type, long count)
{
super(type);
this.count = count;
}
public long getCount()
{
return count;
}
}
List<Trigger> triggerList = new ArrayList<>();
/**
* Creates a TriggerOption with an initial trigger that should be fired at the watermark
*
* @return
*/
public static TriggerOption AtWatermark()
{
TriggerOption triggerOption = new TriggerOption();
Trigger trigger = new Trigger(Type.ON_TIME);
triggerOption.triggerList.add(trigger);
return triggerOption;
}
/**
* A trigger should be fired before the watermark once for every specified duration
*
* @param duration
* @return
*/
public TriggerOption withEarlyFiringsAtEvery(Duration duration)
{
TimeTrigger trigger = new TimeTrigger(Type.EARLY, duration);
triggerList.add(trigger);
return this;
}
/**
* A trigger should be fired before the watermark once for every n tuple(s)
*
* @param count
* @return
*/
public TriggerOption withEarlyFiringsAtEvery(long count)
{
CountTrigger trigger = new CountTrigger(Type.EARLY, count);
triggerList.add(trigger);
return this;
}
/**
* A trigger should be fired after the watermark once for every specified duration
*
* @param duration
* @return
*/
public TriggerOption withLateFiringsAtEvery(Duration duration)
{
TimeTrigger trigger = new TimeTrigger(Type.LATE, duration);
triggerList.add(trigger);
return this;
}
/**
* A trigger should be fired after the watermark once for every n late tuple(s)
*
* @param count
* @return
*/
public TriggerOption withLateFiringsAtEvery(long count)
{
CountTrigger trigger = new CountTrigger(Type.LATE, count);
triggerList.add(trigger);
return this;
}
/**
* With discarding mode, the state is discarded after each trigger
*
* @return
*/
public TriggerOption discardingFiredPanes()
{
this.accumulationMode = AccumulationMode.DISCARDING;
return this;
}
/**
* With accumulating mode, the state is kept
*
* @return
*/
public TriggerOption accumulatingFiredPanes()
{
this.accumulationMode = AccumulationMode.ACCUMULATING;
return this;
}
/**
* With accumulating and retracting mode, the state is kept, and the snapshot of the state is saved after each trigger
* so when new values come in that change the state, a retraction trigger can be fired with the snapshot of the state
* when the last trigger was fired
*
* @return
*/
public TriggerOption accumulatingAndRetractingFiredPanes()
{
this.accumulationMode = AccumulationMode.ACCUMULATING_AND_RETRACTING;
return this;
}
/**
* Only fire triggers for data that has changed from the last trigger. This only applies to ACCUMULATING and
* ACCUMULATING_AND_RETRACTING accumulation modes.
*
* @return
*/
public TriggerOption firingOnlyUpdatedPanes()
{
this.firingOnlyUpdatedPanes = true;
return this;
}
/**
* Gets the accumulation mode
*
* @return
*/
public AccumulationMode getAccumulationMode()
{
return accumulationMode;
}
/**
* Gets the trigger list
*
* @return
*/
public List<Trigger> getTriggerList()
{
return Collections.unmodifiableList(triggerList);
}
/**
* Returns whether we should only fire panes that have been updated since the last trigger.
* When this option is set, DISCARDING accumulation mode must not be used.
*
* @return
*/
public boolean isFiringOnlyUpdatedPanes()
{
return this.firingOnlyUpdatedPanes;
}
}