| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.apex.malhar.lib.window.impl; |
| |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| |
| import javax.validation.ValidationException; |
| |
| import org.joda.time.Duration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.apex.malhar.lib.window.Accumulation; |
| import org.apache.apex.malhar.lib.window.ControlTuple; |
| import org.apache.apex.malhar.lib.window.TriggerOption; |
| import org.apache.apex.malhar.lib.window.Tuple; |
| import org.apache.apex.malhar.lib.window.Window; |
| import org.apache.apex.malhar.lib.window.WindowOption; |
| import org.apache.apex.malhar.lib.window.WindowState; |
| import org.apache.apex.malhar.lib.window.WindowedOperator; |
| import org.apache.apex.malhar.lib.window.WindowedStorage; |
| import org.apache.hadoop.classification.InterfaceStability; |
| |
| import com.google.common.base.Function; |
| |
| import com.datatorrent.api.Context; |
| import com.datatorrent.api.DefaultInputPort; |
| import com.datatorrent.api.DefaultOutputPort; |
| import com.datatorrent.api.annotation.InputPortFieldAnnotation; |
| import com.datatorrent.common.util.BaseOperator; |
| |
| /** |
| * This is the abstract windowed operator class that implements most of the windowing, triggering, and accumulating |
| * concepts. The subclass of this abstract class is supposed to provide the implementation of how the accumulated |
| * values are stored in the storage. |
| * |
| * @param <InputT> The type of the input tuple |
| * @param <OutputT> The type of the output tuple |
| * @param <DataStorageT> The type of the data storage |
| * @param <RetractionStorageT> The type of the retraction storage |
| * @param <AccumulationT> The type of the accumulation |
| * |
| * @since 3.5.0 |
| */ |
| @InterfaceStability.Evolving |
| public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT extends WindowedStorage, RetractionStorageT extends WindowedStorage, AccumulationT extends Accumulation> |
| extends BaseOperator implements WindowedOperator<InputT> |
| { |
| |
| protected WindowOption windowOption; |
| protected TriggerOption triggerOption; |
| protected long allowedLatenessMillis = -1; |
| protected WindowedStorage<WindowState> windowStateMap; |
| |
| private Function<InputT, Long> timestampExtractor; |
| |
| private long currentWatermark = -1; |
| private long watermarkTimestamp = -1; |
| private boolean triggerAtWatermark; |
| private long earlyTriggerCount; |
| private long earlyTriggerMillis; |
| private long lateTriggerCount; |
| private long lateTriggerMillis; |
| private long currentDerivedTimestamp = -1; |
| private long windowWidthMillis; |
| private long fixedWatermarkMillis = -1; |
| protected DataStorageT dataStorage; |
| protected RetractionStorageT retractionStorage; |
| protected AccumulationT accumulation; |
| |
| private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class); |
| |
| public final transient DefaultInputPort<Tuple<InputT>> input = new DefaultInputPort<Tuple<InputT>>() |
| { |
| @Override |
| public void process(Tuple<InputT> tuple) |
| { |
| processTuple(tuple); |
| } |
| }; |
| |
| // TODO: This port should be removed when Apex Core has native support for custom control tuples |
| @InputPortFieldAnnotation(optional = true) |
| public final transient DefaultInputPort<ControlTuple> controlInput = new DefaultInputPort<ControlTuple>() |
| { |
| @Override |
| public void process(ControlTuple tuple) |
| { |
| if (tuple instanceof ControlTuple.Watermark) { |
| processWatermark((ControlTuple.Watermark)tuple); |
| } |
| } |
| }; |
| |
| // TODO: multiple input ports for join operations |
| |
| public final transient DefaultOutputPort<Tuple.WindowedTuple<OutputT>> output = new DefaultOutputPort<>(); |
| |
| // TODO: This port should be removed when Apex Core has native support for custom control tuples |
| public final transient DefaultOutputPort<ControlTuple> controlOutput = new DefaultOutputPort<>(); |
| |
| /** |
| * Process the incoming data tuple |
| * |
| * @param tuple |
| */ |
| public void processTuple(Tuple<InputT> tuple) |
| { |
| long timestamp = extractTimestamp(tuple); |
| if (isTooLate(timestamp)) { |
| dropTuple(tuple); |
| } else { |
| Tuple.WindowedTuple<InputT> windowedTuple = getWindowedValue(tuple); |
| // do the accumulation |
| accumulateTuple(windowedTuple); |
| |
| for (Window window : windowedTuple.getWindows()) { |
| WindowState windowState = windowStateMap.get(window); |
| windowState.tupleCount++; |
| // process any count based triggers |
| if (windowState.watermarkArrivalTime == -1) { |
| // watermark has not arrived yet, check for early count based trigger |
| if (earlyTriggerCount > 0 && (windowState.tupleCount % earlyTriggerCount) == 0) { |
| fireTrigger(window, windowState); |
| } |
| } else { |
| // watermark has arrived, check for late count based trigger |
| if (lateTriggerCount > 0 && (windowState.tupleCount % lateTriggerCount) == 0) { |
| fireTrigger(window, windowState); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void setWindowOption(WindowOption windowOption) |
| { |
| this.windowOption = windowOption; |
| if (this.windowOption instanceof WindowOption.GlobalWindow) { |
| windowStateMap.put(Window.GLOBAL_WINDOW, new WindowState()); |
| } |
| } |
| |
| @Override |
| public void setTriggerOption(TriggerOption triggerOption) |
| { |
| this.triggerOption = triggerOption; |
| for (TriggerOption.Trigger trigger : triggerOption.getTriggerList()) { |
| switch (trigger.getType()) { |
| case ON_TIME: |
| triggerAtWatermark = true; |
| break; |
| case EARLY: |
| if (trigger instanceof TriggerOption.TimeTrigger) { |
| earlyTriggerMillis = ((TriggerOption.TimeTrigger)trigger).getDuration().getMillis(); |
| } else if (trigger instanceof TriggerOption.CountTrigger) { |
| earlyTriggerCount = ((TriggerOption.CountTrigger)trigger).getCount(); |
| } |
| break; |
| case LATE: |
| if (trigger instanceof TriggerOption.TimeTrigger) { |
| lateTriggerMillis = ((TriggerOption.TimeTrigger)trigger).getDuration().getMillis(); |
| } else if (trigger instanceof TriggerOption.CountTrigger) { |
| lateTriggerCount = ((TriggerOption.CountTrigger)trigger).getCount(); |
| } |
| break; |
| default: |
| throw new RuntimeException("Unknown trigger type: " + trigger.getType()); |
| } |
| } |
| } |
| |
| @Override |
| public void setAllowedLateness(Duration allowedLateness) |
| { |
| this.allowedLatenessMillis = allowedLateness.getMillis(); |
| } |
| |
| /** |
| * This method sets the storage for the data for each window |
| * |
| * @param storageAgent |
| */ |
| public void setDataStorage(DataStorageT storageAgent) |
| { |
| this.dataStorage = storageAgent; |
| } |
| |
| /** |
| * This method sets the storage for the retraction data for each window. Only used when the accumulation mode is ACCUMULATING_AND_RETRACTING |
| * |
| * @param storageAgent |
| */ |
| public void setRetractionStorage(RetractionStorageT storageAgent) |
| { |
| this.retractionStorage = storageAgent; |
| } |
| |
| /** |
| * Sets the accumulation, which basically tells the WindowedOperator what to do if a new tuple comes in and what |
| * to put in the pane when a trigger is fired |
| * |
| * @param accumulation |
| */ |
| public void setAccumulation(AccumulationT accumulation) |
| { |
| this.accumulation = accumulation; |
| } |
| |
| @Override |
| public void setWindowStateStorage(WindowedStorage<WindowState> storageAgent) |
| { |
| this.windowStateMap = storageAgent; |
| } |
| |
| @Override |
| public void setTimestampExtractor(Function<InputT, Long> timestampExtractor) |
| { |
| this.timestampExtractor = timestampExtractor; |
| } |
| |
| /** |
| * Sets the fixed watermark with respect to the processing time derived from the Apex window ID. This is useful if we |
| * don't have watermark tuples from upstream. However, using this means whether a tuple is considered late totally |
| * depends on the Apex window ID of this operator. |
| * |
| * Note that setting this value will make incoming watermark tuples useless. |
| */ |
| public void setFixedWatermark(long millis) |
| { |
| this.fixedWatermarkMillis = millis; |
| } |
| |
| public void validate() throws ValidationException |
| { |
| if (accumulation == null) { |
| throw new ValidationException("Accumulation must be set"); |
| } |
| if (dataStorage == null) { |
| throw new ValidationException("Data storage must be set"); |
| } |
| if (windowStateMap == null) { |
| throw new ValidationException("Window state storage must be set"); |
| } |
| if (triggerOption != null) { |
| if (triggerOption.isFiringOnlyUpdatedPanes()) { |
| if (retractionStorage == null) { |
| throw new ValidationException("A retraction storage is required for firingOnlyUpdatedPanes option"); |
| } |
| if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.DISCARDING) { |
| throw new ValidationException("DISCARDING accumulation mode is not valid for firingOnlyUpdatedPanes option"); |
| } |
| } |
| if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING && |
| retractionStorage == null) { |
| throw new ValidationException("A retraction storage is required for ACCUMULATING_AND_RETRACTING accumulation mode"); |
| } |
| } |
| } |
| |
| @Override |
| public Tuple.WindowedTuple<InputT> getWindowedValue(Tuple<InputT> input) |
| { |
| if (windowOption == null && input instanceof Tuple.WindowedTuple) { |
| // inherit the windows from upstream |
| return (Tuple.WindowedTuple<InputT>)input; |
| } |
| Tuple.WindowedTuple<InputT> windowedTuple = new Tuple.WindowedTuple<>(); |
| windowedTuple.setValue(input.getValue()); |
| windowedTuple.setTimestamp(extractTimestamp(input)); |
| assignWindows(windowedTuple.getWindows(), input); |
| return windowedTuple; |
| } |
| |
| private long extractTimestamp(Tuple<InputT> tuple) |
| { |
| if (timestampExtractor == null) { |
| if (tuple instanceof Tuple.TimestampedTuple) { |
| return ((Tuple.TimestampedTuple)tuple).getTimestamp(); |
| } else { |
| return 0; |
| } |
| } else { |
| return timestampExtractor.apply(tuple.getValue()); |
| } |
| } |
| |
| private void assignWindows(List<Window> windows, Tuple<InputT> inputTuple) |
| { |
| if (windowOption instanceof WindowOption.GlobalWindow) { |
| windows.add(Window.GLOBAL_WINDOW); |
| } else { |
| long timestamp = extractTimestamp(inputTuple); |
| if (windowOption instanceof WindowOption.TimeWindows) { |
| |
| for (Window.TimeWindow window : getTimeWindowsForTimestamp(timestamp)) { |
| if (!windowStateMap.containsWindow(window)) { |
| windowStateMap.put(window, new WindowState()); |
| } |
| windows.add(window); |
| } |
| } else if (windowOption instanceof WindowOption.SessionWindows) { |
| assignSessionWindows(windows, timestamp, inputTuple); |
| } |
| } |
| } |
| |
| protected void assignSessionWindows(List<Window> windows, long timestamp, Tuple<InputT> inputTuple) |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * Returns the list of windows TimeWindows for the given timestamp. |
| * If we are doing sliding windows, this will return multiple windows. Otherwise, only one window will be returned. |
| * Note that this method does not apply to SessionWindows. |
| * |
| * @param timestamp |
| * @return |
| */ |
| private List<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp) |
| { |
| List<Window.TimeWindow> windows = new ArrayList<>(); |
| if (windowOption instanceof WindowOption.TimeWindows) { |
| long durationMillis = ((WindowOption.TimeWindows)windowOption).getDuration().getMillis(); |
| long beginTimestamp = timestamp - timestamp % durationMillis; |
| windows.add(new Window.TimeWindow(beginTimestamp, durationMillis)); |
| if (windowOption instanceof WindowOption.SlidingTimeWindows) { |
| long slideBy = ((WindowOption.SlidingTimeWindows)windowOption).getSlideByDuration().getMillis(); |
| // add the sliding windows front and back |
| // Note: this messes up the order of the window and we might want to revisit this if the order of the windows |
| // matter |
| for (long slideBeginTimestamp = beginTimestamp - slideBy; |
| slideBeginTimestamp <= timestamp && timestamp < slideBeginTimestamp + durationMillis; |
| slideBeginTimestamp -= slideBy) { |
| windows.add(new Window.TimeWindow(slideBeginTimestamp, durationMillis)); |
| } |
| for (long slideBeginTimestamp = beginTimestamp + slideBy; |
| slideBeginTimestamp <= timestamp && timestamp < slideBeginTimestamp + durationMillis; |
| slideBeginTimestamp += slideBy) { |
| windows.add(new Window.TimeWindow(slideBeginTimestamp, durationMillis)); |
| } |
| } |
| } else { |
| throw new IllegalStateException("Unexpected WindowOption"); |
| } |
| return windows; |
| } |
| |
| @Override |
| public boolean isTooLate(long timestamp) |
| { |
| return allowedLatenessMillis < 0 ? false : (timestamp < currentWatermark - allowedLatenessMillis); |
| } |
| |
| @Override |
| public void dropTuple(Tuple<InputT> input) |
| { |
| // do nothing |
| LOG.debug("Dropping late tuple {}", input); |
| } |
| |
| @Override |
| public void processWatermark(ControlTuple.Watermark watermark) |
| { |
| this.watermarkTimestamp = watermark.getTimestamp(); |
| } |
| |
| @Override |
| public void setup(Context.OperatorContext context) |
| { |
| this.windowWidthMillis = context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); |
| validate(); |
| } |
| |
| /** |
| * This is for the Apex streaming/application window. Do not confuse this with the windowing concept in this operator |
| */ |
| @Override |
| public void beginWindow(long windowId) |
| { |
| if (currentDerivedTimestamp == -1) { |
| currentDerivedTimestamp = ((windowId >> 32) * 1000) + (windowId & 0xffffffffL); |
| } else { |
| currentDerivedTimestamp += windowWidthMillis; |
| } |
| watermarkTimestamp = -1; |
| } |
| |
| /** |
| * This is for the Apex streaming/application window. Do not confuse this with the windowing concept in this operator |
| */ |
| @Override |
| public void endWindow() |
| { |
| // We only do actual processing of watermark at window boundary so that it will not break idempotency. |
| // TODO: May want to revisit this if the application cares more about latency than idempotency |
| processWatermarkAtEndWindow(); |
| fireTimeTriggers(); |
| } |
| |
| private void processWatermarkAtEndWindow() |
| { |
| if (fixedWatermarkMillis > 0) { |
| watermarkTimestamp = currentDerivedTimestamp - fixedWatermarkMillis; |
| } |
| if (watermarkTimestamp > 0) { |
| this.currentWatermark = watermarkTimestamp; |
| |
| long horizon = watermarkTimestamp - allowedLatenessMillis; |
| |
| for (Iterator<Map.Entry<Window, WindowState>> it = windowStateMap.iterator(); it.hasNext(); ) { |
| Map.Entry<Window, WindowState> entry = it.next(); |
| Window window = entry.getKey(); |
| WindowState windowState = entry.getValue(); |
| if (window.getBeginTimestamp() + window.getDurationMillis() < watermarkTimestamp) { |
| // watermark has not arrived for this window before, marking this window late |
| if (windowState.watermarkArrivalTime == -1) { |
| windowState.watermarkArrivalTime = currentDerivedTimestamp; |
| if (triggerAtWatermark) { |
| // fire trigger at watermark if applicable |
| fireTrigger(window, windowState); |
| } |
| } |
| if (allowedLatenessMillis >= 0 && window.getBeginTimestamp() + window.getDurationMillis() < horizon) { |
| // discard this window because it's too late now |
| it.remove(); |
| dataStorage.remove(window); |
| retractionStorage.remove(window); |
| } |
| } |
| } |
| controlOutput.emit(new WatermarkImpl(watermarkTimestamp)); |
| } |
| } |
| |
| private void fireTimeTriggers() |
| { |
| if (earlyTriggerMillis > 0 || lateTriggerMillis > 0) { |
| for (Map.Entry<Window, WindowState> entry : windowStateMap.entrySet()) { |
| Window window = entry.getKey(); |
| WindowState windowState = entry.getValue(); |
| if (windowState.watermarkArrivalTime == -1) { |
| if (earlyTriggerMillis > 0 && windowState.lastTriggerFiredTime + earlyTriggerMillis <= currentDerivedTimestamp) { |
| // fire early time triggers |
| fireTrigger(window, windowState); |
| } |
| } else { |
| if (lateTriggerMillis > 0 && windowState.lastTriggerFiredTime + lateTriggerMillis <= currentDerivedTimestamp) { |
| // fire late time triggers |
| fireTrigger(window, windowState); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void fireTrigger(Window window, WindowState windowState) |
| { |
| if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { |
| fireRetractionTrigger(window); |
| } |
| fireNormalTrigger(window, triggerOption.isFiringOnlyUpdatedPanes()); |
| windowState.lastTriggerFiredTime = currentDerivedTimestamp; |
| if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.DISCARDING) { |
| clearWindowData(window); |
| } |
| } |
| |
| /** |
| * This method fires the normal trigger for the given window. |
| * |
| * @param window |
| * @param fireOnlyUpdatedPanes Do not fire trigger if the old value is the same as the new value. If true, retraction storage is required. |
| */ |
| public abstract void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes); |
| |
| /** |
| * This method fires the retraction trigger for the given window. This should only be valid if the accumulation |
| * mode is ACCUMULATING_AND_RETRACTING |
| * |
| * @param window |
| */ |
| public abstract void fireRetractionTrigger(Window window); |
| |
| |
| @Override |
| public void clearWindowData(Window window) |
| { |
| dataStorage.remove(window); |
| } |
| |
| } |