#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""`iobase.RestrictionTracker` implementations provided with Apache Beam."""
from __future__ import absolute_import
from __future__ import division

import threading
from builtins import object

from apache_beam.io.iobase import RestrictionProgress
from apache_beam.io.iobase import RestrictionTracker
from apache_beam.io.range_trackers import OffsetRangeTracker


class OffsetRange(object):

  def __init__(self, start, stop):
    if start > stop:
      raise ValueError(
          'Start offset must be not be larger than the stop offset. '
          'Received %d and %d respectively.' % (start, stop))
    self.start = start
    self.stop = stop

  def __eq__(self, other):
    if not isinstance(other, OffsetRange):
      return False

    return self.start == other.start and self.stop == other.stop

  def __ne__(self, other):
    # TODO(BEAM-5949): Needed for Python 2 compatibility.
    return not self == other

  def __hash__(self):
    return hash((type(self), self.start, self.stop))

  def split(self, desired_num_offsets_per_split, min_num_offsets_per_split=1):
    current_split_start = self.start
    max_split_size = max(desired_num_offsets_per_split,
                         min_num_offsets_per_split)
    while current_split_start < self.stop:
      current_split_stop = min(current_split_start + max_split_size, self.stop)
      remaining = self.stop - current_split_stop

      # Avoiding a small split at the end.
      if (remaining < desired_num_offsets_per_split // 4 or
          remaining < min_num_offsets_per_split):
        current_split_stop = self.stop

      yield OffsetRange(current_split_start, current_split_stop)
      current_split_start = current_split_stop

  def split_at(self, split_pos):
    return OffsetRange(self.start, split_pos), OffsetRange(split_pos, self.stop)

  def new_tracker(self):
    return OffsetRangeTracker(self.start, self.stop)

  def size(self):
    return self.stop - self.start


class OffsetRestrictionTracker(RestrictionTracker):
  """An `iobase.RestrictionTracker` implementations for an offset range.

  Offset range is represented as OffsetRange.
  """

  def __init__(self, offset_range):
    assert isinstance(offset_range, OffsetRange)
    self._range = offset_range
    self._current_position = None
    self._current_watermark = None
    self._last_claim_attempt = None
    self._deferred_residual = None
    self._checkpointed = False
    self._lock = threading.RLock()

  def check_done(self):
    with self._lock:
      if self._last_claim_attempt < self._range.stop - 1:
        raise ValueError(
            'OffsetRestrictionTracker is not done since work in range [%s, %s) '
            'has not been claimed.'
            % (self._last_claim_attempt if self._last_claim_attempt is not None
               else self._range.start,
               self._range.stop))

  def current_restriction(self):
    with self._lock:
      return self._range

  def current_watermark(self):
    return self._current_watermark

  def current_progress(self):
    with self._lock:
      if self._current_position is None:
        fraction = 0.0
      elif self._range.stop == self._range.start:
        # If self._current_position is not None, we must be done.
        fraction = 1.0
      else:
        fraction = (
            float(self._current_position - self._range.start)
            / (self._range.stop - self._range.start))
    return RestrictionProgress(fraction=fraction)

  def start_position(self):
    with self._lock:
      return self._range.start

  def stop_position(self):
    with self._lock:
      return self._range.stop

  def default_size(self):
    return self._range.size()

  def try_claim(self, position):
    with self._lock:
      if self._last_claim_attempt and position <= self._last_claim_attempt:
        raise ValueError(
            'Positions claimed should strictly increase. Trying to claim '
            'position %d while last claim attempt was %d.'
            % (position, self._last_claim_attempt))

      self._last_claim_attempt = position
      if position < self._range.start:
        raise ValueError(
            'Position to be claimed cannot be smaller than the start position '
            'of the range. Tried to claim position %r for the range [%r, %r)'
            % (position, self._range.start, self._range.stop))

      if position >= self._range.start and position < self._range.stop:
        self._current_position = position
        return True

      return False

  def try_split(self, fraction_of_remainder):
    with self._lock:
      if not self._checkpointed:
        if self._current_position is None:
          cur = self._range.start - 1
        else:
          cur = self._current_position
        split_point = (
            cur + int(max(1, (self._range.stop - cur) * fraction_of_remainder)))
        if split_point < self._range.stop:
          self._range, residual_range = self._range.split_at(split_point)
          return self._range, residual_range

  # TODO(SDF): Replace all calls with try_claim(0).
  def checkpoint(self):
    with self._lock:
      # If self._current_position is 'None' no records have been claimed so
      # residual should start from self._range.start.
      if self._current_position is None:
        end_position = self._range.start
      else:
        end_position = self._current_position + 1
      self._range, residual_range = self._range.split_at(end_position)
      return residual_range

  def defer_remainder(self, watermark=None):
    with self._lock:
      self._deferred_watermark = watermark or self._current_watermark
      self._deferred_residual = self.checkpoint()

  def deferred_status(self):
    if self._deferred_residual:
      return (self._deferred_residual, self._deferred_watermark)
