/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.fluo.core.worker;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import com.codahale.metrics.Gauge;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.observer.Observers;
import org.apache.fluo.core.util.FluoExecutors;
import org.apache.fluo.core.util.Hex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationProcessor implements AutoCloseable {

  private static final Logger log = LoggerFactory.getLogger(NotificationProcessor.class);

  private NotificationTracker tracker;
  private ThreadPoolExecutor executor;
  private Environment env;
  private Observers observers;
  private PriorityBlockingQueue<Runnable> queue;

  public NotificationProcessor(Environment env) {
    int numThreads = env.getConfiguration().getWorkerThreads();
    this.env = env;
    this.queue = new PriorityBlockingQueue<>();
    this.executor = FluoExecutors.newFixedThreadPool(numThreads, queue, "ntfyProc");
    this.tracker = new NotificationTracker();
    this.observers = env.getConfiguredObservers().getObservers(env);
    env.getSharedResources().getMetricRegistry().register(
        env.getMetricNames().getNotificationQueued(), (Gauge<Integer>) () -> queue.size());
  }

  // little utility class that tracks all notifications in queue
  private class NotificationTracker {
    private Map<RowColumn, Future<?>> queuedWork = new HashMap<>();
    private Set<RowColumn> recentlyDeleted = new HashSet<>();
    private long sizeInBytes = 0;
    private Map<Long, Predicate<RowColumn>> memoryPredicates = new HashMap<>();
    private Predicate<RowColumn> memoryPredicate = rc -> false;
    private static final long MAX_SIZE = 1 << 24;
    private long nextSessionId = 0;

    private long size(RowColumn rowCol) {
      Column col = rowCol.getColumn();
      return (long) rowCol.getRow().length() + col.getFamily().length()
          + col.getQualifier().length() + col.getVisibility().length();
    }

    public synchronized boolean add(RowColumn rowCol, Future<?> task) {

      if (queuedWork.containsKey(rowCol) || recentlyDeleted.contains(rowCol)) {
        return false;
      }

      while (sizeInBytes > MAX_SIZE) {
        try {
          wait(1000);
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
      }

      if (queuedWork.containsKey(rowCol) || recentlyDeleted.contains(rowCol)) {
        return false;
      }

      queuedWork.put(rowCol, task);
      sizeInBytes += size(rowCol);
      return true;
    }

    public synchronized void remove(RowColumn rowCol) {
      if (queuedWork.remove(rowCol) != null) {
        if (memoryPredicate.test(rowCol)) {
          recentlyDeleted.add(rowCol);
        }
        sizeInBytes -= size(rowCol);
        notify();
      }
    }

    public synchronized void clear() {
      for (Future<?> task : queuedWork.values()) {
        task.cancel(false);
      }

      queuedWork.clear();
      sizeInBytes = 0;
      notify();
    }

    public synchronized boolean requeue(RowColumn rowCol, FutureTask<?> ft) {
      if (!queuedWork.containsKey(rowCol)) {
        return false;
      }

      queuedWork.put(rowCol, ft);

      return true;
    }

    private void resetMemoryPredicate() {
      memoryPredicate = null;
      for (Predicate<RowColumn> p : this.memoryPredicates.values()) {
        if (memoryPredicate == null) {
          memoryPredicate = p;
        } else {
          memoryPredicate = p.or(memoryPredicate);
        }
      }
    }

    public synchronized long beginAddingNotifications(Predicate<RowColumn> memoryPredicate) {
      long sessionId = nextSessionId++;
      this.memoryPredicates.put(sessionId, Objects.requireNonNull(memoryPredicate));
      resetMemoryPredicate();
      return sessionId;
    }

    public synchronized void finishAddingNotifications(long sessionId) {
      this.memoryPredicates.remove(sessionId);
      if (memoryPredicates.isEmpty()) {
        recentlyDeleted.clear();
        memoryPredicate = rc -> false;
      } else {
        resetMemoryPredicate();
      }
    }

  }

  private class NotificationProcessingTask implements Runnable {

    Notification notification;
    NotificationFinder notificationFinder;
    WorkTaskAsync workTask;

    NotificationProcessingTask(Notification n, NotificationFinder nf, WorkTaskAsync wt) {
      this.notification = n;
      this.notificationFinder = nf;
      this.workTask = wt;
    }

    @Override
    public void run() {
      try {
        // Its possible that while the notification was in the queue the situation changed and it
        // should no longer be processed by this worker. So ask as late as possible if this
        // notification should be processed.
        if (notificationFinder.shouldProcess(notification)) {
          workTask.run();
        } else {
          notificationProcessed(notification);
        }
      } catch (Exception e) {
        log.error("Failed to process work " + Hex.encNonAscii(notification), e);
      }
    }

  }

  private class FutureNotificationTask extends FutureTask<Void>
      implements Comparable<FutureNotificationTask> {

    private final Notification notification;

    public FutureNotificationTask(Notification n, NotificationFinder nf, WorkTaskAsync wt) {
      super(new NotificationProcessingTask(n, nf, wt), null);
      this.notification = n;
    }

    @Override
    public int compareTo(FutureNotificationTask o) {
      return Long.compare(notification.getTimestamp(), o.notification.getTimestamp());
    }

    @Override
    public boolean equals(Object o) {
      if (o instanceof FutureNotificationTask) {
        return compareTo((FutureNotificationTask) o) == 0;
      }
      return false;
    }

    @Override
    public int hashCode() {
      return Long.hashCode(notification.getTimestamp());
    }

    @Override
    protected void setException(Throwable t) {
      super.setException(t);
      System.err.println("Uncaught Exception ");
      t.printStackTrace();
    }
  }

  public class Session implements AutoCloseable {
    private long id;

    public Session(Predicate<RowColumn> memoryPredicate) {
      this.id = tracker.beginAddingNotifications(memoryPredicate);
    }

    public boolean addNotification(final NotificationFinder notificationFinder,
        final Notification notification) {

      WorkTaskAsync workTask = new WorkTaskAsync(NotificationProcessor.this, notificationFinder,
          env, notification, observers);
      FutureTask<?> ft = new FutureNotificationTask(notification, notificationFinder, workTask);

      if (!tracker.add(notification.getRowColumn(), ft)) {
        return false;
      }

      try {
        executor.execute(ft);
      } catch (RejectedExecutionException rje) {
        tracker.remove(notification.getRowColumn());
        throw rje;
      }

      return true;
    }

    @Override
    public void close() {
      tracker.finishAddingNotifications(id);
    }
  }

  /**
   * Starts a session for adding notifications. During this session, any notifications that are
   * deleted and match the predicate will be remembered. These remembered notifications can not be
   * added again while the session is active.
   */
  public Session beginAddingNotifications(Predicate<RowColumn> memoryPredicate) {
    return new Session(memoryPredicate);
  }

  public void requeueNotification(final NotificationFinder notificationFinder,
      final Notification notification) {

    WorkTaskAsync workTask =
        new WorkTaskAsync(this, notificationFinder, env, notification, observers);
    FutureTask<?> ft = new FutureNotificationTask(notification, notificationFinder, workTask);

    if (tracker.requeue(notification.getRowColumn(), ft)) {
      try {
        executor.execute(ft);
      } catch (RejectedExecutionException rje) {
        tracker.remove(notification.getRowColumn());
        throw rje;
      }
    }
  }

  public void notificationProcessed(final Notification notification) {
    tracker.remove(notification.getRowColumn());
  }

  public int size() {
    return queue.size();
  }

  public void clear() {
    tracker.clear();
    executor.purge();
  }

  @Override
  public void close() {
    executor.shutdownNow();
    observers.close();

    try {
      while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {

      }
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}
