/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.fluo.core.impl;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Mutation;
import org.apache.fluo.core.util.FluoThreadFactory;

// created this class because batch writer blocks adding mutations while its flushing

public class SharedBatchWriter {

  private final BatchWriter bw;
  private ArrayBlockingQueue<MutationBatch> mutQueue;
  private MutationBatch end = new MutationBatch(new ArrayList<Mutation>(), false);

  private AtomicLong asyncBatchesAdded = new AtomicLong(0);
  private long asyncBatchesProcessed = 0;
  // added to avoid findbugs false positive
  private static final Supplier<Void> NULLS = () -> null;

  private static class MutationBatch {

    private Collection<Mutation> mutations;
    private CountDownLatch cdl;
    private boolean isAsync = false;
    private CompletableFuture<Void> cf;

    public MutationBatch(Collection<Mutation> mutations, boolean isAsync) {
      this.mutations = mutations;
      this.isAsync = isAsync;
      if (!isAsync) {
        this.cdl = new CountDownLatch(1);
      }
    }

    public MutationBatch(Collection<Mutation> mutations, CompletableFuture<Void> cf) {
      this.mutations = mutations;
      this.cf = cf;
      this.cdl = null;
      this.isAsync = false;
    }

    public void countDown() {
      if (cdl != null) {
        cdl.countDown();
      }

      if (cf != null) {
        cf.complete(NULLS.get());
      }
    }
  }

  private class FlushTask implements Runnable {

    @Override
    public void run() {
      boolean keepRunning = true;
      ArrayList<MutationBatch> batches = new ArrayList<>();

      while (keepRunning || batches.size() > 0) {
        try {
          if (batches.size() == 0) {
            batches.add(mutQueue.take());
          }
          mutQueue.drainTo(batches);

          processBatches(batches);

          for (MutationBatch mutationBatch : batches) {
            if (mutationBatch == end) {
              keepRunning = false;
            }
          }

          batches.clear();
        } catch (Exception e) {
          // TODO error handling
          e.printStackTrace();
        }
      }

    }

    private void processBatches(ArrayList<MutationBatch> batches)
        throws MutationsRejectedException {
      for (MutationBatch mutationBatch : batches) {
        if (mutationBatch != end) {
          bw.addMutations(mutationBatch.mutations);
        }
      }

      bw.flush();

      int numAsync = 0;

      for (MutationBatch mutationBatch : batches) {
        mutationBatch.countDown();

        if (mutationBatch.isAsync) {
          numAsync++;
        }
      }

      if (numAsync > 0) {
        synchronized (SharedBatchWriter.this) {
          asyncBatchesProcessed += numAsync;
          SharedBatchWriter.this.notifyAll();
        }
      }
    }
  }

  SharedBatchWriter(BatchWriter bw) {
    this.bw = bw;
    this.mutQueue = new ArrayBlockingQueue<>(100000);
    Thread thread = new FluoThreadFactory("sharedBW").newThread(new FlushTask());
    thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
      @Override
      public void uncaughtException(Thread t, Throwable e) {
        System.err.println("Uncaught exception in shared batch writer");
        e.printStackTrace();
      }
    });
    thread.setDaemon(true);
    thread.start();
  }

  void writeMutation(Mutation m) {
    writeMutations(Collections.singletonList(m));
  }

  void writeMutations(Collection<Mutation> ml) {

    if (ml.size() == 0) {
      return;
    }

    try {
      MutationBatch mb = new MutationBatch(ml, false);
      mutQueue.put(mb);
      mb.cdl.await();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  CompletableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
    if (ml.size() == 0) {
      return CompletableFuture.completedFuture(NULLS.get());
    }

    CompletableFuture<Void> cf = new CompletableFuture<>();
    try {
      MutationBatch mb = new MutationBatch(ml, cf);
      mutQueue.put(mb);
      return cf;
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  CompletableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
    return writeMutationsAsyncFuture(Collections.singleton(m));
  }

  void close() {
    try {
      mutQueue.put(end);
      end.cdl.await();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  void writeMutationsAsync(List<Mutation> ml) {
    try {
      MutationBatch mb = new MutationBatch(ml, true);
      asyncBatchesAdded.incrementAndGet();
      mutQueue.put(mb);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  void writeMutationAsync(Mutation m) {
    writeMutationsAsync(Collections.singletonList(m));
  }

  /**
   * waits for all async mutations that were added before this was called to be flushed. Does not
   * wait for async mutations added after call.
   */
  public void waitForAsyncFlush() {
    long numAdded = asyncBatchesAdded.get();

    synchronized (this) {
      while (numAdded > asyncBatchesProcessed) {
        try {
          wait();
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
      }
    }
  }
}
