| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.apex.malhar.lib.window.impl; |
| |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.Map; |
| |
| import org.apache.apex.malhar.lib.state.spillable.SpillableSetMultimapImpl; |
| import org.apache.apex.malhar.lib.util.KeyValPair; |
| import org.apache.apex.malhar.lib.window.Accumulation; |
| import org.apache.apex.malhar.lib.window.SessionWindowedStorage; |
| import org.apache.apex.malhar.lib.window.TriggerOption; |
| import org.apache.apex.malhar.lib.window.Tuple; |
| import org.apache.apex.malhar.lib.window.Window; |
| import org.apache.apex.malhar.lib.window.WindowOption; |
| import org.apache.apex.malhar.lib.window.WindowState; |
| import org.apache.apex.malhar.lib.window.WindowedStorage; |
| import org.apache.hadoop.classification.InterfaceStability; |
| |
| import com.datatorrent.api.Context; |
| |
| /** |
| * This is an implementation of WindowedOperator that takes in key value pairs as input and gives out key value pairs |
| * as output. If your operation is not key based, please use {@link WindowedOperatorImpl}. |
| * |
| * @param <KeyT> The type of the key of both the input and the output tuple |
| * @param <InputValT> The type of the value of the keyed input tuple |
| * @param <AccumT> The type of the accumulated value in the operator state per key per window |
| * @param <OutputValT> The type of the value of the keyed output tuple |
| * |
| * @since 3.5.0 |
| */ |
| @InterfaceStability.Evolving |
| public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> |
| extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<? super InputValT, AccumT, OutputValT>> |
| { |
| private SpillableSetMultimapImpl<Window, KeyT> updatedKeyStorage; |
| |
| @Override |
| public void setup(Context.OperatorContext context) |
| { |
| if (useUpdatedKeyStorage()) { |
| updatedKeyStorage.getStore().setup(context); |
| updatedKeyStorage.setup(context); |
| } |
| |
| super.setup(context); |
| } |
| |
| @Override |
| protected <T> Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<T> inputTuple) |
| { |
| if (!(inputTuple.getValue() instanceof KeyValPair)) { |
| throw new UnsupportedOperationException("Session window require keyed tuples"); |
| } else { |
| KeyT key = ((KeyValPair<KeyT, ?>)inputTuple.getValue()).getKey(); |
| WindowOption.SessionWindows sessionWindowOption = (WindowOption.SessionWindows)windowOption; |
| SessionWindowedStorage<KeyT, AccumT> sessionStorage = (SessionWindowedStorage<KeyT, AccumT>)dataStorage; |
| long minGapMillis = sessionWindowOption.getMinGap().getMillis(); |
| Collection<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> sessionEntries = sessionStorage.getSessionEntries(key, timestamp, minGapMillis); |
| Window.SessionWindow<KeyT> sessionWindowToAssign; |
| switch (sessionEntries.size()) { |
| case 0: { |
| // There are no existing windows within the minimum gap. Create a new session window |
| Window.SessionWindow<KeyT> sessionWindow = new Window.SessionWindow<>(key, timestamp, minGapMillis); |
| windowStateMap.put(sessionWindow, new WindowState()); |
| sessionWindowToAssign = sessionWindow; |
| break; |
| } |
| case 1: { |
| // There is already one existing window within the minimum gap. See whether we need to extend the time of that window |
| Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry = sessionEntries.iterator().next(); |
| Window.SessionWindow<KeyT> sessionWindow = sessionWindowEntry.getKey(); |
| if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp + minGapMillis <= sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) { |
| // The session window already covers the event |
| sessionWindowToAssign = sessionWindow; |
| } else { |
| // The session window does not cover the event but is within the min gap |
| if (triggerOption != null && |
| triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { |
| // fire a retraction trigger because the session window will be enlarged |
| fireRetractionTrigger(sessionWindow, false); |
| } |
| // create a new session window that covers the timestamp |
| long newBeginTimestamp = Math.min(sessionWindow.getBeginTimestamp(), timestamp); |
| long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis(), timestamp + minGapMillis); |
| Window.SessionWindow<KeyT> newSessionWindow = |
| new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp); |
| windowStateMap.remove(sessionWindow); |
| sessionStorage.migrateWindow(sessionWindow, newSessionWindow); |
| windowStateMap.put(newSessionWindow, new WindowState()); |
| sessionWindowToAssign = newSessionWindow; |
| } |
| break; |
| } |
| case 2: { |
| // There are two windows that overlap the proto-session window of the timestamp. We need to merge the two windows |
| Iterator<Map.Entry<Window.SessionWindow<KeyT>, AccumT>> iterator = sessionEntries.iterator(); |
| Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry1 = iterator.next(); |
| Map.Entry<Window.SessionWindow<KeyT>, AccumT> sessionWindowEntry2 = iterator.next(); |
| Window.SessionWindow<KeyT> sessionWindow1 = sessionWindowEntry1.getKey(); |
| Window.SessionWindow<KeyT> sessionWindow2 = sessionWindowEntry2.getKey(); |
| AccumT sessionData1 = sessionWindowEntry1.getValue(); |
| AccumT sessionData2 = sessionWindowEntry2.getValue(); |
| if (triggerOption != null && |
| triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { |
| // fire a retraction trigger because the two session windows will be merged to a new window |
| fireRetractionTrigger(sessionWindow1, false); |
| fireRetractionTrigger(sessionWindow2, false); |
| } |
| long newBeginTimestamp = Math.min(sessionWindow1.getBeginTimestamp(), sessionWindow2.getBeginTimestamp()); |
| long newEndTimestamp = Math.max(sessionWindow1.getBeginTimestamp() + sessionWindow1.getDurationMillis(), |
| sessionWindow2.getBeginTimestamp() + sessionWindow2.getDurationMillis()); |
| |
| Window.SessionWindow<KeyT> newSessionWindow = new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp - newBeginTimestamp); |
| AccumT newSessionData = accumulation.merge(sessionData1, sessionData2); |
| sessionStorage.remove(sessionWindow1); |
| sessionStorage.remove(sessionWindow2); |
| sessionStorage.put(newSessionWindow, key, newSessionData); |
| windowStateMap.remove(sessionWindow1); |
| windowStateMap.remove(sessionWindow2); |
| windowStateMap.put(newSessionWindow, new WindowState()); |
| sessionWindowToAssign = newSessionWindow; |
| break; |
| } |
| default: |
| throw new IllegalStateException("There are more than two sessions matching one timestamp"); |
| } |
| return Collections.<Window.SessionWindow>singleton(sessionWindowToAssign); |
| } |
| } |
| |
| @Override |
| public void endWindow() |
| { |
| super.endWindow(); |
| if (useUpdatedKeyStorage()) { |
| updatedKeyStorage.endWindow(); |
| } |
| } |
| |
| private boolean useUpdatedKeyStorage() |
| { |
| return updatedKeyStorage != null && isFiringOnlyUpdatedPanes(); |
| } |
| |
| @Override |
| public void accumulateTuple(Tuple.WindowedTuple<KeyValPair<KeyT, InputValT>> tuple) |
| { |
| KeyValPair<KeyT, InputValT> kvData = tuple.getValue(); |
| KeyT key = kvData.getKey(); |
| for (Window window : tuple.getWindows()) { |
| // process each window |
| AccumT accum = dataStorage.get(window, key); |
| if (accum == null) { |
| accum = accumulation.defaultAccumulatedValue(); |
| } |
| |
| InputValT inputValue = kvData.getValue(); |
| AccumT newValue = accumulation.accumulate(accum, inputValue); |
| if ((earlyTriggerMillis > 0 || lateTriggerMillis > 0 || earlyTriggerCount > 0 || lateTriggerCount > 0) && useUpdatedKeyStorage()) { |
| updatedKeyStorage.put(window, key); |
| } |
| |
| dataStorage.put(window, key, newValue); |
| } |
| } |
| |
| @Override |
| public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes) |
| { |
| if (useUpdatedKeyStorage()) { |
| for (KeyT key : updatedKeyStorage.get(window)) { |
| OutputValT outputVal = accumulation.getOutput(dataStorage.get(window, key)); |
| if (retractionStorage != null) { |
| OutputValT oldValue = retractionStorage.get(window, key); |
| if (oldValue != null && oldValue.equals(outputVal)) { |
| continue; |
| } |
| } |
| output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(key, outputVal))); |
| if (retractionStorage != null) { |
| retractionStorage.put(window, key, outputVal); |
| } |
| } |
| updatedKeyStorage.removeAll(window); |
| } else { |
| for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) { |
| OutputValT outputVal = accumulation.getOutput(entry.getValue()); |
| |
| output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), outputVal))); |
| if (retractionStorage != null) { |
| retractionStorage.put(window, entry.getKey(), outputVal); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void fireRetractionTrigger(Window window, boolean fireOnlyUpdatedPanes) |
| { |
| if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) { |
| throw new UnsupportedOperationException(); |
| } |
| for (Map.Entry<KeyT, OutputValT> entry : retractionStorage.entries(window)) { |
| if (fireOnlyUpdatedPanes) { |
| AccumT currentAccum = dataStorage.get(window, entry.getKey()); |
| if (currentAccum != null) { |
| OutputValT currentValue = accumulation.getOutput(currentAccum); |
| if (currentValue != null && currentValue.equals(entry.getValue())) { |
| continue; |
| } |
| } |
| } |
| output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue())))); |
| } |
| } |
| |
| public SpillableSetMultimapImpl<Window, KeyT> getUpdatedKeyStorage() |
| { |
| return updatedKeyStorage; |
| } |
| |
| public void setUpdatedKeyStorage(SpillableSetMultimapImpl<Window, KeyT> updatedKeyStorage) |
| { |
| this.updatedKeyStorage = updatedKeyStorage; |
| } |
| |
| } |