/**
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.aurora.benchmark;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import javax.inject.Singleton;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.TypeLiteral;

import org.apache.aurora.benchmark.fakes.FakeDriver;
import org.apache.aurora.benchmark.fakes.FakeOfferManager;
import org.apache.aurora.benchmark.fakes.FakeRescheduleCalculator;
import org.apache.aurora.benchmark.fakes.FakeSchedulerDriver;
import org.apache.aurora.benchmark.fakes.FakeStatsProvider;
import org.apache.aurora.common.application.ShutdownStage;
import org.apache.aurora.common.base.Command;
import org.apache.aurora.common.base.Commands;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.common.util.testing.FakeClock;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ServerInfo;
import org.apache.aurora.scheduler.TaskIdGenerator;
import org.apache.aurora.scheduler.TaskStatusHandler;
import org.apache.aurora.scheduler.TaskStatusHandlerImpl;
import org.apache.aurora.scheduler.TierModule;
import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.config.CliOptions;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.mesos.DriverFactory;
import org.apache.aurora.scheduler.mesos.DriverSettings;
import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory;
import org.apache.aurora.scheduler.mesos.MesosCallbackHandler;
import org.apache.aurora.scheduler.mesos.MesosCallbackHandler.MesosCallbackHandlerImpl;
import org.apache.aurora.scheduler.mesos.MesosSchedulerImpl;
import org.apache.aurora.scheduler.mesos.ProtosConversion;
import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
import org.apache.aurora.scheduler.offers.OfferManager;
import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
import org.apache.aurora.scheduler.state.ClusterStateImpl;
import org.apache.aurora.scheduler.state.StateModule;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.IServerInfo;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.mesos.Scheduler;
import org.apache.mesos.v1.Protos;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.slf4j.LoggerFactory;

import static java.util.Objects.requireNonNull;

/**
 * Performance benchmarks for status update processing throughput. Note that we
 * need to send many updates and wait for all transitions to occur within one run
 * of the benchmark. This is because we don't want to assume that status updates
 * are processed synchronously.
 */
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
@Fork(1)
@Threads(1)
@State(Scope.Thread)
public class StatusUpdateBenchmark {
  /**
   * Simulates a slow storage backend by introducing latency on top of an
   * underlying storage implementation.
   *
   * TODO(bmahler): Consider specifying read and write latency separately.
   */
  private static final class SlowStorageWrapper implements Storage {
    private final Storage underlyingStorage;
    private Optional<Amount<Long, Time>> latency = Optional.empty();

    private SlowStorageWrapper(Storage underlyingStorage) {
      this.underlyingStorage = requireNonNull(underlyingStorage);
    }

    private void setLatency(Amount<Long, Time> latency) {
      this.latency = Optional.of(latency);
    }

    private void maybeSleep() {
      if (latency.isPresent()) {
        Uninterruptibles.sleepUninterruptibly(
            latency.get().getValue(),
            latency.get().getUnit().getTimeUnit());
      }
    }

    @Override
    public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
      maybeSleep();
      return underlyingStorage.read(work);
    }

    @Override
    public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E {
      maybeSleep();
      return underlyingStorage.write(work);
    }

    @Override
    public void prepare() throws StorageException {
      underlyingStorage.prepare();
    }
  }

  // Benchmark with 1000 tasks to easily observe the kilo-qps of status
  // update processing. Consider varying this number if needed.
  private static final int NUM_TASKS = 1000;

  // Vary the storage latency to observe the effect on throughput.
  @Param({"5", "25", "100"})
  private long latencyMilliseconds;

  private Scheduler scheduler;
  private AbstractExecutionThreadService statusHandler;
  private SlowStorageWrapper storage;
  private EventBus eventBus;
  private Set<IScheduledTask> tasks;
  private CountDownLatch countDownLatch;

  /**
   * Run once per trial to set up the benchmark.
   */
  @Setup(Level.Trial)
  public void setUpBenchmark() {
    eventBus = new EventBus();
    storage = new SlowStorageWrapper(MemStorageModule.newEmptyStorage());

    Injector injector = Guice.createInjector(
        new StateModule(new CliOptions()),
        new TierModule(TaskTestUtil.TIER_CONFIG),
        new AbstractModule() {
          @Override
          protected void configure() {
            bind(Driver.class).toInstance(new FakeDriver());
            bind(Scheduler.class).to(MesosSchedulerImpl.class);
            bind(MesosSchedulerImpl.class).in(Singleton.class);
            bind(MesosCallbackHandler.class).to(MesosCallbackHandlerImpl.class);
            bind(MesosCallbackHandlerImpl.class).in(Singleton.class);
            bind(Executor.class)
                .annotatedWith(SchedulerDriverModule.SchedulerExecutor.class)
                .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor(
                    "SchedulerImpl-%d",
                    LoggerFactory.getLogger(StatusUpdateBenchmark.class)));
            bind(DriverFactory.class)
                .toInstance((s, credentials, frameworkInfo, master) -> new FakeSchedulerDriver());
            bind(OfferManager.class).toInstance(new FakeOfferManager());
            bind(TaskIdGenerator.class).to(TaskIdGenerator.TaskIdGeneratorImpl.class);
            bind(SchedulingFilter.class).to(SchedulingFilterImpl.class);
            bind(Command.class).annotatedWith(ShutdownStage.class).toInstance(Commands.NOOP);
            bind(Thread.UncaughtExceptionHandler.class).toInstance(
                (t, e) -> {
                  // no-op
                });
            bind(Storage.class).toInstance(storage);
            bind(DriverSettings.class).toInstance(
                new DriverSettings(
                    "fakemaster",
                    Optional.empty()));
            bind(FrameworkInfoFactory.class).toInstance(() -> Protos.FrameworkInfo.newBuilder()
                    .setUser("framework user")
                    .setName("test framework")
                    .build());
            bind(RescheduleCalculator.class).toInstance(new FakeRescheduleCalculator());
            bind(Clock.class).toInstance(new FakeClock());
            bind(ExecutorSettings.class).toInstance(TestExecutorSettings.THERMOS_EXECUTOR);
            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
            bind(EventSink.class).toInstance(eventBus::post);
            bind(new TypeLiteral<BlockingQueue<Protos.TaskStatus>>() { })
                .annotatedWith(TaskStatusHandlerImpl.StatusUpdateQueue.class)
                .toInstance(new LinkedBlockingQueue<>());
            bind(new TypeLiteral<Integer>() { })
                .annotatedWith(TaskStatusHandlerImpl.MaxBatchSize.class)
                .toInstance(1000);
            bind(TaskStatusHandler.class).to(TaskStatusHandlerImpl.class);
            bind(TaskStatusHandlerImpl.class).in(Singleton.class);
            bind(IServerInfo.class).toInstance(IServerInfo.build(new ServerInfo("jmh", "")));
          }
        }
    );

    eventBus.register(injector.getInstance(ClusterStateImpl.class));
    scheduler = injector.getInstance(Scheduler.class);
    eventBus.register(this);

    statusHandler = injector.getInstance(TaskStatusHandlerImpl.class);
    statusHandler.startAsync();
  }

  @TearDown(Level.Trial)
  public void tearDown() {
    statusHandler.stopAsync();
  }

  /**
   * Runs before each iteration of the benchmark in order to vary the storage
   * latency across iterations, based on the latency parameter.
   */
  @Setup(Level.Iteration)
  public void setIterationLatency() {
    storage.setLatency(Amount.of(latencyMilliseconds, Time.MILLISECONDS));
  }

  /**
   * Runs before each invocation of the benchmark in order to store the tasks
   * that we will transition in the benchmark.
   */
  @Setup(Level.Invocation)
  public void createTasks() {
    tasks = new Tasks.Builder()
        .setScheduleStatus(ScheduleStatus.STARTING)
        .build(NUM_TASKS);

    storage.write(
        (NoResult.Quiet) storeProvider -> storeProvider.getUnsafeTaskStore().saveTasks(tasks));

    countDownLatch = new CountDownLatch(tasks.size());
  }

  @Subscribe
  public void taskChangedState(PubsubEvent.TaskStateChange stateChange) {
    countDownLatch.countDown();
  }

  @Benchmark
  public boolean runBenchmark() throws InterruptedException {
    for (String taskId : org.apache.aurora.scheduler.base.Tasks.ids(tasks)) {
      Protos.TaskStatus status = Protos.TaskStatus.newBuilder()
          .setState(Protos.TaskState.TASK_RUNNING)
          .setSource(Protos.TaskStatus.Source.SOURCE_EXECUTOR)
          .setMessage("message")
          .setTimestamp(1D)
          .setTaskId(Protos.TaskID.newBuilder().setValue(taskId).build())
          .build();

      scheduler.statusUpdate(new FakeSchedulerDriver(), ProtosConversion.convert(status));
    }

    // Wait for all task transitions to complete.
    countDownLatch.await();

    // Return an unguessable value.
    return System.currentTimeMillis() % 5 == 0;
  }
}
