#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing,
#  software distributed under the License is distributed on an
#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#  KIND, either express or implied.  See the License for the
#  specific language governing permissions and limitations
#  under the License.

'''window_bolt.py: API for defining windowed bolts in Heron'''
from abc import abstractmethod
from collections import namedtuple, deque
import time
from heronpy.api.bolt.bolt import Bolt
import heronpy.api.api_constants as api_constants
from heronpy.api.state.stateful_component import StatefulComponent

WindowContext = namedtuple('WindowContext', ('start', 'end'))

class SlidingWindowBolt(Bolt, StatefulComponent):
  """SlidingWindowBolt is a higer level bolt for Heron users who want to deal with
     batches of tuples belonging to a certain time window. This bolt keeps track of
     managing the window, adding/expiring tuples based on window configuration.
     This way users will just have to deal with writing processWindow function
  """
  WINDOW_DURATION_SECS = 'slidingwindowbolt_duration_secs'
  WINDOW_SLIDEINTERVAL_SECS = 'slidingwindowbolt_slideinterval_secs'

  # pylint: disable=attribute-defined-outside-init
  def init_state(self, stateful_state):
    self.saved_state = stateful_state

  # pylint: disable=unused-argument
  def pre_save(self, checkpoint_id):
    self.saved_state['tuples'] = self.current_tuples

  @abstractmethod
  def processWindow(self, window_info, tuples):
    """The main interface that needs to be implemented.

    This function is called every WINDOW_SLIDEINTERVAL_SECS seconds
    and contains the data in the last WINDOW_DURATION_SECS seconds
    in a list tuples

    :type window_info: :class:`WindowContext`
    :param window_info: The information about the window

    :type tuples: :class:`list of Tuples`
    :param tuples: The list of tuples in this window
    """
    pass

  # pylint: disable=unused-argument
  def initialize(self, config, context):
    """We initialize the window duration and slide interval
    """
    if SlidingWindowBolt.WINDOW_DURATION_SECS in config:
      self.window_duration = int(config[SlidingWindowBolt.WINDOW_DURATION_SECS])
    else:
      self.logger.fatal("Window Duration has to be specified in the config")
    if SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS in config:
      self.slide_interval = int(config[SlidingWindowBolt.WINDOW_SLIDEINTERVAL_SECS])
    else:
      self.slide_interval = self.window_duration
    if self.slide_interval > self.window_duration:
      self.logger.fatal("Slide Interval should be <= Window Duration")

    # By modifying the config, we are able to setup the tick timer
    config[api_constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS] = str(self.slide_interval)
    self.current_tuples = deque()
    if hasattr(self, 'saved_state'):
      if 'tuples' in self.saved_state:
        self.current_tuples = self.saved_state['tuples']

  def process(self, tup):
    """Process a single tuple of input

    We add the (time, tuple) pair into our current_tuples. And then look for expiring
    elemnents
    """
    curtime = int(time.time())
    self.current_tuples.append((tup, curtime))
    self._expire(curtime)

  def _expire(self, tm):
    while len(self.current_tuples) > 0:
      if tm - self.window_duration > self.current_tuples[0][1]:
        (tup, _) = self.current_tuples.popleft()
        self.ack(tup)
      else:
        break

  # pylint: disable=unused-argument
  # pylint: disable=unused-variable
  def process_tick(self, tup):
    """Called every slide_interval
    """
    curtime = int(time.time())
    window_info = WindowContext(curtime - self.window_duration, curtime)
    tuple_batch = []
    for (tup, tm) in self.current_tuples:
      tuple_batch.append(tup)
    self.processWindow(window_info, tuple_batch)
    self._expire(curtime)


class TumblingWindowBolt(Bolt, StatefulComponent):
  """TumblingWindowBolt is a higer level bolt for Heron users who want to deal with
     batches of tuples belonging to a certain time window. This bolt keeps track of
     managing the window, adding/expiring tuples based on window configuration.
     This way users will just have to deal with writing processWindow function
  """
  WINDOW_DURATION_SECS = 'tumblingwindowbolt_duration_secs'

  # pylint: disable=attribute-defined-outside-init
  def init_state(self, stateful_state):
    self.saved_state = stateful_state

  # pylint: disable=unused-argument
  def pre_save(self, checkpoint_id):
    self.saved_state['tuples'] = self.current_tuples

  @abstractmethod
  def processWindow(self, window_info, tuples):
    """The main interface that needs to be implemented.

    This function is called every WINDOW_DURATION_SECS seconds
    and contains the data in the last WINDOW_DURATION_SECS seconds
    in a list tuples

    :type window_info: :class:`WindowContext`
    :param window_info: The information about the window

    :type tuples: :class:`list of Tuples`
    :param tuples: The list of tuples in this window
    """
    pass

  # pylint: disable=unused-argument
  def initialize(self, config, context):
    """We initialize the window duration and slide interval
    """
    if TumblingWindowBolt.WINDOW_DURATION_SECS in config:
      self.window_duration = int(config[TumblingWindowBolt.WINDOW_DURATION_SECS])
    else:
      self.logger.fatal("Window Duration has to be specified in the config")

    # By modifying the config, we are able to setup the tick timer
    config[api_constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS] = str(self.window_duration)
    self.current_tuples = deque()
    if hasattr(self, 'saved_state'):
      if 'tuples' in self.saved_state:
        self.current_tuples = self.saved_state['tuples']

  def process(self, tup):
    """Process a single tuple of input

    We simply add the tuple into our current_tuples.
    """
    self.current_tuples.append(tup)

  # pylint: disable=unused-argument
  # pylint: disable=unused-variable
  def process_tick(self, tup):
    """Called every window_duration
    """
    curtime = int(time.time())
    window_info = WindowContext(curtime - self.window_duration, curtime)
    self.processWindow(window_info, list(self.current_tuples))
    for tup in self.current_tuples:
      self.ack(tup)
    self.current_tuples.clear()
