/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.apex.malhar.lib.window.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import javax.validation.ValidationException;

import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.apex.malhar.lib.state.spillable.WindowListener;
import org.apache.apex.malhar.lib.window.Accumulation;
import org.apache.apex.malhar.lib.window.ControlTuple;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.WindowState;
import org.apache.apex.malhar.lib.window.WindowedOperator;
import org.apache.apex.malhar.lib.window.WindowedStorage;
import org.apache.hadoop.classification.InterfaceStability;

import com.google.common.base.Function;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;

/**
 * This is the abstract windowed operator class that implements most of the windowing, triggering, and accumulating
 * concepts. The subclass of this abstract class is supposed to provide the implementation of how the accumulated
 * values are stored in the storage.
 *
 * @param <InputT> The type of the input tuple
 * @param <OutputT> The type of the output tuple
 * @param <DataStorageT> The type of the data storage
 * @param <RetractionStorageT> The type of the retraction storage
 * @param <AccumulationT> The type of the accumulation
 *
 * @since 3.5.0
 */
@InterfaceStability.Evolving
public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT extends WindowedStorage, RetractionStorageT extends WindowedStorage, AccumulationT extends Accumulation>
    extends BaseOperator implements WindowedOperator<InputT>, Operator.CheckpointNotificationListener
{

  protected WindowOption windowOption;
  protected TriggerOption triggerOption;
  protected long allowedLatenessMillis = -1;
  protected WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap;

  private Function<InputT, Long> timestampExtractor;

  private long currentWatermark = -1;
  private long watermarkTimestamp = -1;
  private boolean triggerAtWatermark;
  private long earlyTriggerCount;
  private long earlyTriggerMillis;
  private long lateTriggerCount;
  private long lateTriggerMillis;
  private long currentDerivedTimestamp = -1;
  private long timeIncrement;
  private long fixedWatermarkMillis = -1;

  private Map<String, Component<Context.OperatorContext>> components = new HashMap<>();

  protected DataStorageT dataStorage;
  protected RetractionStorageT retractionStorage;
  protected AccumulationT accumulation;

  private static final transient Collection<? extends Window> GLOBAL_WINDOW_SINGLETON_SET = Collections.singleton(Window.GlobalWindow.INSTANCE);
  private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class);

  public final transient DefaultInputPort<Tuple<InputT>> input = new DefaultInputPort<Tuple<InputT>>()
  {
    @Override
    public void process(Tuple<InputT> tuple)
    {
      processTuple(tuple);
    }
  };

  // TODO: This port should be removed when Apex Core has native support for custom control tuples
  @InputPortFieldAnnotation(optional = true)
  public final transient DefaultInputPort<ControlTuple> controlInput = new DefaultInputPort<ControlTuple>()
  {
    @Override
    public void process(ControlTuple tuple)
    {
      if (tuple instanceof ControlTuple.Watermark) {
        processWatermark((ControlTuple.Watermark)tuple);
      }
    }
  };

  // TODO: multiple input ports for join operations

  public final transient DefaultOutputPort<Tuple.WindowedTuple<OutputT>> output = new DefaultOutputPort<>();

  // TODO: This port should be removed when Apex Core has native support for custom control tuples
  public final transient DefaultOutputPort<ControlTuple> controlOutput = new DefaultOutputPort<>();

  /**
   * Process the incoming data tuple
   *
   * @param tuple the incoming tuple
   */
  public void processTuple(Tuple<InputT> tuple)
  {
    long timestamp = extractTimestamp(tuple);
    if (isTooLate(timestamp)) {
      dropTuple(tuple);
    } else {
      Tuple.WindowedTuple<InputT> windowedTuple = getWindowedValue(tuple);
      // do the accumulation
      accumulateTuple(windowedTuple);

      for (Window window : windowedTuple.getWindows()) {
        WindowState windowState = windowStateMap.get(window);
        windowState.tupleCount++;
        // process any count based triggers
        if (windowState.watermarkArrivalTime == -1) {
          // watermark has not arrived yet, check for early count based trigger
          if (earlyTriggerCount > 0 && (windowState.tupleCount % earlyTriggerCount) == 0) {
            fireTrigger(window, windowState);
          }
        } else {
          // watermark has arrived, check for late count based trigger
          if (lateTriggerCount > 0 && (windowState.tupleCount % lateTriggerCount) == 0) {
            fireTrigger(window, windowState);
          }
        }
      }
    }
  }

  @Override
  public void setWindowOption(WindowOption windowOption)
  {
    this.windowOption = windowOption;
  }

  @Override
  public void setTriggerOption(TriggerOption triggerOption)
  {
    this.triggerOption = triggerOption;
    for (TriggerOption.Trigger trigger : triggerOption.getTriggerList()) {
      switch (trigger.getType()) {
        case ON_TIME:
          triggerAtWatermark = true;
          break;
        case EARLY:
          if (trigger instanceof TriggerOption.TimeTrigger) {
            earlyTriggerMillis = ((TriggerOption.TimeTrigger)trigger).getDuration().getMillis();
          } else if (trigger instanceof TriggerOption.CountTrigger) {
            earlyTriggerCount = ((TriggerOption.CountTrigger)trigger).getCount();
          }
          break;
        case LATE:
          if (trigger instanceof TriggerOption.TimeTrigger) {
            lateTriggerMillis = ((TriggerOption.TimeTrigger)trigger).getDuration().getMillis();
          } else if (trigger instanceof TriggerOption.CountTrigger) {
            lateTriggerCount = ((TriggerOption.CountTrigger)trigger).getCount();
          }
          break;
        default:
          throw new RuntimeException("Unknown trigger type: " + trigger.getType());
      }
    }
  }

  @Override
  public void setAllowedLateness(Duration allowedLateness)
  {
    this.allowedLatenessMillis = allowedLateness.getMillis();
  }

  /**
   * This method sets the storage for the data for each window
   *
   * @param dataStorage The data storage
   */
  public void setDataStorage(DataStorageT dataStorage)
  {
    this.dataStorage = dataStorage;
  }

  /**
   * This method sets the storage for the retraction data for each window. Only used when the accumulation mode is ACCUMULATING_AND_RETRACTING
   *
   * @param retractionStorage The retraction storage
   */
  public void setRetractionStorage(RetractionStorageT retractionStorage)
  {
    this.retractionStorage = retractionStorage;
  }

  public void addComponent(String key, Component<Context.OperatorContext> component)
  {
    components.put(key, component);
  }

  /**
   * Sets the accumulation, which basically tells the WindowedOperator what to do if a new tuple comes in and what
   * to put in the pane when a trigger is fired
   *
   * @param accumulation the accumulation
   */
  public void setAccumulation(AccumulationT accumulation)
  {
    this.accumulation = accumulation;
  }

  public void setWindowStateStorage(WindowedStorage.WindowedPlainStorage<WindowState> storageAgent)
  {
    this.windowStateMap = storageAgent;
  }

  @Override
  public void setTimestampExtractor(Function<InputT, Long> timestampExtractor)
  {
    this.timestampExtractor = timestampExtractor;
  }

  /**
   * Sets the fixed watermark with respect to the processing time derived from the Apex window ID. This is useful if we
   * don't have watermark tuples from upstream. However, using this means whether a tuple is considered late totally
   * depends on the Apex window ID of this operator.
   *
   * Note that setting this value will make incoming watermark tuples useless.
   */
  public void setFixedWatermark(long millis)
  {
    this.fixedWatermarkMillis = millis;
  }

  public void validate() throws ValidationException
  {
    if (accumulation == null) {
      throw new ValidationException("Accumulation must be set");
    }
    if (dataStorage == null) {
      throw new ValidationException("Data storage must be set");
    }
    if (windowStateMap == null) {
      throw new ValidationException("Window state storage must be set");
    }
    if (triggerOption != null) {
      if (triggerOption.isFiringOnlyUpdatedPanes()) {
        if (retractionStorage == null) {
          throw new ValidationException("A retraction storage is required for firingOnlyUpdatedPanes option");
        }
        if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.DISCARDING) {
          throw new ValidationException("DISCARDING accumulation mode is not valid for firingOnlyUpdatedPanes option");
        }
      }
      if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING &&
          retractionStorage == null) {
        throw new ValidationException("A retraction storage is required for ACCUMULATING_AND_RETRACTING accumulation mode");
      }
    }
  }

  @Override
  public Tuple.WindowedTuple<InputT> getWindowedValue(Tuple<InputT> input)
  {
    if (windowOption == null && input instanceof Tuple.WindowedTuple) {
      // inherit the windows from upstream
      return (Tuple.WindowedTuple<InputT>)input;
    } else {
      return new Tuple.WindowedTuple<>(assignWindows(input), extractTimestamp(input), input.getValue());
    }
  }

  private long extractTimestamp(Tuple<InputT> tuple)
  {
    if (timestampExtractor == null) {
      if (tuple instanceof Tuple.TimestampedTuple) {
        return ((Tuple.TimestampedTuple)tuple).getTimestamp();
      } else {
        return 0;
      }
    } else {
      return timestampExtractor.apply(tuple.getValue());
    }
  }

  private Collection<? extends Window> assignWindows(Tuple<InputT> inputTuple)
  {
    if (windowOption instanceof WindowOption.GlobalWindow) {
      return GLOBAL_WINDOW_SINGLETON_SET;
    } else {
      long timestamp = extractTimestamp(inputTuple);
      if (windowOption instanceof WindowOption.TimeWindows) {
        Collection<? extends Window> windows = getTimeWindowsForTimestamp(timestamp);
        for (Window window : windows) {
          if (!windowStateMap.containsWindow(window)) {
            windowStateMap.put(window, new WindowState());
          }
        }
        return windows;
      } else if (windowOption instanceof WindowOption.SessionWindows) {
        return assignSessionWindows(timestamp, inputTuple);
      } else {
        throw new IllegalStateException("Unsupported Window Option: " + windowOption.getClass());
      }
    }
  }

  protected Collection<Window.SessionWindow> assignSessionWindows(long timestamp, Tuple<InputT> inputTuple)
  {
    throw new UnsupportedOperationException("Session window require keyed tuples");
  }

  /**
   * Returns the list of windows TimeWindows for the given timestamp.
   * If we are doing sliding windows, this will return multiple windows. Otherwise, only one window will be returned.
   * Note that this method does not apply to SessionWindows.
   *
   * @param timestamp the timestamp
   * @return the windows this timestamp belongs to
   */
  private Collection<Window.TimeWindow> getTimeWindowsForTimestamp(long timestamp)
  {
    List<Window.TimeWindow> windows = new ArrayList<>();
    if (windowOption instanceof WindowOption.TimeWindows) {
      long durationMillis = ((WindowOption.TimeWindows)windowOption).getDuration().getMillis();
      long beginTimestamp = timestamp - timestamp % durationMillis;
      windows.add(new Window.TimeWindow(beginTimestamp, durationMillis));
      if (windowOption instanceof WindowOption.SlidingTimeWindows) {
        long slideBy = ((WindowOption.SlidingTimeWindows)windowOption).getSlideByDuration().getMillis();
        // add the sliding windows front and back
        for (long slideBeginTimestamp = beginTimestamp - slideBy;
            slideBeginTimestamp <= timestamp && timestamp < slideBeginTimestamp + durationMillis;
            slideBeginTimestamp -= slideBy) {
          windows.add(new Window.TimeWindow(slideBeginTimestamp, durationMillis));
        }
        for (long slideBeginTimestamp = beginTimestamp + slideBy;
            slideBeginTimestamp <= timestamp && timestamp < slideBeginTimestamp + durationMillis;
            slideBeginTimestamp += slideBy) {
          windows.add(new Window.TimeWindow(slideBeginTimestamp, durationMillis));
        }
      }
    } else {
      throw new IllegalStateException("Unexpected WindowOption");
    }
    return windows;
  }

  @Override
  public boolean isTooLate(long timestamp)
  {
    return allowedLatenessMillis >= 0 && (timestamp < currentWatermark - allowedLatenessMillis);
  }

  @Override
  public void dropTuple(Tuple<InputT> input)
  {
    // do nothing
    LOG.debug("Dropping late tuple {}", input);
  }

  @Override
  public void processWatermark(ControlTuple.Watermark watermark)
  {
    this.watermarkTimestamp = watermark.getTimestamp();
  }

  @Override
  @SuppressWarnings("unchecked")
  public void setup(Context.OperatorContext context)
  {
    this.timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
    validate();
    windowStateMap.setup(context);
    dataStorage.setup(context);
    if (retractionStorage != null) {
      retractionStorage.setup(context);
    }
    for (Component component : components.values()) {
      component.setup(context);
    }
    if (this.windowOption instanceof WindowOption.GlobalWindow) {
      windowStateMap.put(Window.GlobalWindow.INSTANCE, new WindowState());
    }
  }

  @Override
  public void teardown()
  {
    windowStateMap.teardown();
    dataStorage.teardown();
    if (retractionStorage != null) {
      retractionStorage.teardown();
    }
    for (Component component : components.values()) {
      component.teardown();
    }
  }

  /**
   * This is for the Apex streaming/application window. Do not confuse this with the windowing concept in this operator
   */
  @Override
  public void beginWindow(long windowId)
  {
    for (Component component : components.values()) {
      if (component instanceof WindowListener) {
        ((WindowListener)component).beginWindow(windowId);
      }
    }
    if (currentDerivedTimestamp == -1) {
      // TODO: once we are able to get the firstWindowMillis from Apex Core API, we should use that instead
      currentDerivedTimestamp = (windowId >> 32) * 1000;
    } else {
      currentDerivedTimestamp += timeIncrement;
    }
    watermarkTimestamp = -1;
  }

  /**
   * This is for the Apex streaming/application window. Do not confuse this with the windowing concept in this operator
   */
  @Override
  public void endWindow()
  {
    // We only do actual processing of watermark at window boundary so that it will not break idempotency.
    // TODO: May want to revisit this if the application cares more about latency than idempotency
    processWatermarkAtEndWindow();
    fireTimeTriggers();

    for (Component component : components.values()) {
      if (component instanceof WindowListener) {
        ((WindowListener)component).endWindow();
      }
    }
  }

  private void processWatermarkAtEndWindow()
  {
    if (fixedWatermarkMillis > 0) {
      watermarkTimestamp = currentDerivedTimestamp - fixedWatermarkMillis;
    }
    if (watermarkTimestamp > 0) {
      this.currentWatermark = watermarkTimestamp;

      long horizon = watermarkTimestamp - allowedLatenessMillis;

      for (Iterator<Map.Entry<Window, WindowState>> it = windowStateMap.entries().iterator(); it.hasNext(); ) {
        Map.Entry<Window, WindowState> entry = it.next();
        Window window = entry.getKey();
        WindowState windowState = entry.getValue();
        if (window.getBeginTimestamp() + window.getDurationMillis() < watermarkTimestamp) {
          // watermark has not arrived for this window before, marking this window late
          if (windowState.watermarkArrivalTime == -1) {
            windowState.watermarkArrivalTime = currentDerivedTimestamp;
            if (triggerAtWatermark) {
              // fire trigger at watermark if applicable
              fireTrigger(window, windowState);
            }
          }
          if (allowedLatenessMillis >= 0 && window.getBeginTimestamp() + window.getDurationMillis() < horizon) {
            // discard this window because it's too late now
            it.remove();
            dataStorage.remove(window);
            if (retractionStorage != null) {
              retractionStorage.remove(window);
            }
          }
        }
      }
      controlOutput.emit(new WatermarkImpl(watermarkTimestamp));
    }
  }

  private void fireTimeTriggers()
  {
    if (earlyTriggerMillis > 0 || lateTriggerMillis > 0) {
      for (Map.Entry<Window, WindowState> entry : windowStateMap.entries()) {
        Window window = entry.getKey();
        WindowState windowState = entry.getValue();
        if (windowState.watermarkArrivalTime == -1) {
          if (earlyTriggerMillis > 0 && windowState.lastTriggerFiredTime + earlyTriggerMillis <= currentDerivedTimestamp) {
            // fire early time triggers
            fireTrigger(window, windowState);
          }
        } else {
          if (lateTriggerMillis > 0 && windowState.lastTriggerFiredTime + lateTriggerMillis <= currentDerivedTimestamp) {
            // fire late time triggers
            fireTrigger(window, windowState);
          }
        }
      }
    }
  }

  @Override
  public void fireTrigger(Window window, WindowState windowState)
  {
    if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
      fireRetractionTrigger(window);
    }
    fireNormalTrigger(window, triggerOption.isFiringOnlyUpdatedPanes());
    windowState.lastTriggerFiredTime = currentDerivedTimestamp;
    if (triggerOption.getAccumulationMode() == TriggerOption.AccumulationMode.DISCARDING) {
      clearWindowData(window);
    }
  }

  /**
   * This method fires the normal trigger for the given window.
   *
   * @param window the window to fire trigger on
   * @param fireOnlyUpdatedPanes Do not fire trigger if the old value is the same as the new value. If true, retraction storage is required.
   */
  public abstract void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes);

  /**
   * This method fires the retraction trigger for the given window. This should only be valid if the accumulation
   * mode is ACCUMULATING_AND_RETRACTING
   *
   * @param window the window to fire the retraction trigger on
   */
  public abstract void fireRetractionTrigger(Window window);


  @Override
  public void clearWindowData(Window window)
  {
    dataStorage.remove(window);
  }

  @Override
  public void beforeCheckpoint(long windowId)
  {
    for (Component component : components.values()) {
      if (component instanceof CheckpointNotificationListener) {
        ((CheckpointNotificationListener)component).beforeCheckpoint(windowId);
      }
    }
  }

  @Override
  public void checkpointed(long windowId)
  {
    for (Component component : components.values()) {
      if (component instanceof CheckpointNotificationListener) {
        ((CheckpointNotificationListener)component).checkpointed(windowId);
      }
    }
  }

  @Override
  public void committed(long windowId)
  {
    for (Component component : components.values()) {
      if (component instanceof CheckpointNotificationListener) {
        ((CheckpointNotificationListener)component).committed(windowId);
      }
    }
  }
}
