blob: 2c363aab3d2ae9a976e41a8450b4a5e53eb8950c [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.lib.multiwindow;
import java.util.ArrayList;
import javax.validation.constraints.Min;
import com.datatorrent.api.BaseOperator;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
/**
*
* <p>Provides a sliding window class that lets users access both states of all streaming window in current sliding window
* and state of the expired streaming window from last sliding windows. </p>
* <p>
* <b>Properties</b>:<br>
* <b>T</b> is the tuple object the operator accept <br>
* <b>S</b> is the state object kept in the sliding window <br>
* <b>windowSize</b>: Number of streaming window in this sliding window<br>
* <br>
*
* @displayName Abstract Sliding Window
* @category Multi-Window
* @tags sliding window, state
* @since 0.3.3
*/
public abstract class AbstractSlidingWindow<T, S> extends BaseOperator
{
/**
* Input port for getting incoming data.
*/
public final transient DefaultInputPort<T> data = new DefaultInputPort<T>()
{
@Override
public void process(T tuple)
{
processDataTuple(tuple);
}
};
protected ArrayList<S> states = null;
protected S lastExpiredWindowState = null;
protected int currentCursor = -1;
@Min(2)
int windowSize = 2;
/**
* getter function for n (number of previous window states
*
* @return n
*/
@Min(2)
public int getWindowSize()
{
return windowSize;
}
/**
* setter for windowSize
*
* @param i
*/
public void setWindowSize(int windowSize)
{
this.windowSize = windowSize;
}
abstract protected void processDataTuple(T tuple);
/**
* Implement this method to create the state object needs to be kept in the sliding window
*
* @return the state of current streaming window
*/
public abstract S createWindowState();
/**
* Get the Streaming window state in it's coming the order start from 0
*
* @param i
* 0 the state of the first coming streaming window
* -1 the state of the last expired streaming window
* @return State of the streaming window
* @throws ArrayIndexOutOfBoundsException if i >= sliding window size
*/
public S getStreamingWindowState(int i)
{
if(i == -1){
return lastExpiredWindowState;
}
if (i >= getWindowSize()) {
throw new ArrayIndexOutOfBoundsException();
}
int index = (currentCursor + 1 + i) % windowSize ;
return states.get(index);
}
/**
* Moves states by 1 and sets current state to null. If you override
* beginWindow, you must call super.beginWindow(windowId) to ensure proper
* operator behavior.
*
* @param windowId
*/
@Override
public void beginWindow(long windowId)
{
// move currentCursor 1 position
currentCursor = (currentCursor + 1) % windowSize;
// expire the state at the first position which is the state of the streaming window moving out of the current application window
lastExpiredWindowState = states.get(currentCursor);
states.set(currentCursor, createWindowState());
}
/**
* Sets up internal state structure
*
* @param context
*/
@Override
public void setup(OperatorContext context)
{
super.setup(context);
states = new ArrayList<S>(windowSize);
//initialize the sliding window state to null
for (int i = 0; i < windowSize; i++) {
states.add(null);
}
currentCursor = -1;
}
}