/*
 * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.datatorrent.lib.multiwindow;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import javax.validation.constraints.Min;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
import com.datatorrent.lib.util.KeyValPair;

/**
 *
 * Provides a sliding window class that lets users access past N-1 window states where N is a property.
 * <p>
 * The default behavior is just a pass through, i.e. the
 * operator does not do any processing on its own. Users are expected to extend
 * this class and add their specific processing. Users have to define their own
 * output port(s). The tuples are KeyValue pair. This is an abstract class. The
 * concrete class has to provide the interface SlidingWindowObject, which keeps
 * information about each window.<br>
 * This module is end of window as per users choice<br>
 * <br>
 * <b>Ports</b>:<br>
 * <b>data</b>: expects T (any POJO)<br>
 * <br>
 * <b>Properties</b>:<br>
 * <b>windowSize i.e. N</b>: Number of windows to keep state on<br>
 * <br>
 * @displayName Abstract Sliding Window Key Value
 * @category Multi-Window
 * @tags sliding window, numeric, key value, average
 * @since 0.3.3
 */
public abstract class AbstractSlidingWindowKeyVal<K, V extends Number, S extends SimpleMovingAverageObject>
		extends BaseNumberKeyValueOperator<K, V>
{
	/**
	 * buffer to hold state information of different windows.
	 */
	protected HashMap<K, ArrayList<S>> buffer = new HashMap<K, ArrayList<S>>();
	/**
	 * Index of windows stating at 0.
	 */
	protected int currentstate = -1;

	/**
	 * Concrete class has to implement how they want the tuple to be processed.
	 *
	 * @param tuple
	 *          a keyVal pair of tuple.
	 */
	public abstract void processDataTuple(KeyValPair<K, V> tuple);

	/**
	 * Concrete class has to implement what to emit at the end of window.
	 *
	 * @param key
	 * @param obj
	 */
	public abstract void emitTuple(K key, ArrayList<S> obj);

	/**
	 * Length of sliding windows. Minimum value is 2.
	 */
	@Min(2)
	protected int windowSize = 2;
	protected long windowId;

	/**
	 * Getter function for windowSize (number of previous window buffer).
	 *
	 * @return windowSize
	 */
	public int getWindowSize()
	{
		return windowSize;
	}

	/**
	 * @param windowSize
	 */
	public void setWindowSize(int windowSize)
	{
		this.windowSize = windowSize;
	}

	/**
	 * Input port for getting incoming data.
	 */
	public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>()
	{
		@Override
		public void process(KeyValPair<K, V> tuple)
		{
			processDataTuple(tuple);
		}
	};

	/**
	 * Moves buffer by 1 and clear contents of current. If you override
	 * beginWindow, you must call super.beginWindow(windowId) to ensure proper
	 * operator behavior.
	 *
	 * @param windowId
	 */
	@Override
	public void beginWindow(long windowId)
	{
		this.windowId = windowId;
		currentstate++;
		if (currentstate >= windowSize) {
			for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
				ArrayList<S> states = e.getValue();
                		S first = states.get(0);
				for (int i=1; i < windowSize; i++) states.set(i-1, states.get(i));
				states.set(windowSize-1, first);
			}
			currentstate = windowSize-1;
		}
		for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
			e.getValue().get(currentstate).clear();
		}
	}

	/**
	 * Emit tuple for each key.
	 */
	@Override
	public void endWindow()
	{
		for (Map.Entry<K, ArrayList<S>> e : buffer.entrySet()) {
			emitTuple(e.getKey(), e.getValue());
		}
	}
}
