/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.samza.rest;

import java.util.Map;
import joptsimple.OptionSet;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.metrics.ReadableMetricsRegistry;
import org.apache.samza.monitor.SamzaMonitorService;
import org.apache.samza.util.CommandLine;
import org.apache.samza.util.MetricsReporterLoader;
import org.apache.samza.util.Util;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.Servlet;


/**
 * The main Java class for the Samza REST API. It runs an embedded Jetty server so it can be deployed as a Jar file.
 *
 * This class can be started from the command line by providing the --config-path parameter to a Samza REST config file
 * which will be used to configure the default resources exposed by the API.
 *
 * It can also be managed programmatically using the
 * {@link org.apache.samza.rest.SamzaRestService#addServlet(javax.servlet.Servlet, String)},
 * {@link #start()} and {@link #stop()} methods.
 */
public class SamzaRestService {

  private static final Logger log = LoggerFactory.getLogger(SamzaRestService.class);
  private static final String METRICS_SOURCE = "SamzaRest";

  private final Server server;
  private final ServletContextHandler context;
  private final ReadableMetricsRegistry metricsRegistry;
  private final Map<String, MetricsReporter> metricsReporters;

  public SamzaRestService(Server server,
      ReadableMetricsRegistry metricsRegistry,
      Map<String, MetricsReporter> metricsReporters,
      ServletContextHandler context) {
    this.server = server;
    this.metricsRegistry = metricsRegistry;
    this.metricsReporters = metricsReporters;
    this.context = context;
    this.context.setContextPath("/");
    server.setHandler(context);
  }

  /**
   * Command line interface to run the server.
   *
   * @param args arguments supported by {@link org.apache.samza.util.CommandLine}.
   *             In particular, --config job.config.loader.properties.path and
   *             --config job.config.loader.factory are used to read the Samza REST config file.
   * @throws Exception if the server could not be successfully started.
   */
  public static void main(String[] args)
      throws Exception {
    SamzaMonitorService monitorService = null;
    try {
      SamzaRestConfig config = parseConfig(args);
      ReadableMetricsRegistry metricsRegistry = new MetricsRegistryMap();
      log.info("Creating new SamzaRestService with config: {}", config);
      MetricsConfig metricsConfig = new MetricsConfig(config);
      Map<String, MetricsReporter> metricsReporters =
          MetricsReporterLoader.getMetricsReporters(metricsConfig, Util.getLocalHost().getHostName());
      SamzaRestService restService = new SamzaRestService(new Server(config.getPort()), metricsRegistry, metricsReporters,
          new ServletContextHandler(ServletContextHandler.SESSIONS));

      // Add applications
      SamzaRestApplication samzaRestApplication = new SamzaRestApplication(config);
      ServletContainer container = new ServletContainer(samzaRestApplication);
      restService.addServlet(container, "/*");

      monitorService = new SamzaMonitorService(config, metricsRegistry);
      monitorService.start();

      restService.runBlocking();
    } catch (Throwable t) {
      log.error("Exception in main.", t);
    } finally {
      if (monitorService != null) {
        monitorService.stop();
      }
    }
  }

  /**
   * Reads a {@link org.apache.samza.config.Config} from command line parameters.
   * @param args  the command line parameters supported by {@link org.apache.samza.util.CommandLine}.
   * @return      the parsed {@link org.apache.samza.config.Config}.
   */
  private static SamzaRestConfig parseConfig(String[] args) {
    CommandLine cmd = new CommandLine();
    OptionSet options = cmd.parser().parse(args);
    Config cfg = cmd.loadConfig(options);
    return new SamzaRestConfig(new MapConfig(cfg));
  }

  /**
   * Adds the specified {@link javax.servlet.Servlet} to the server at the specified path.
   * @param servlet the {@link javax.servlet.Servlet} to be added.
   * @param path    the path for the servlet.
   */
  public void addServlet(Servlet servlet, String path) {
    log.info("Adding servlet {} for path {}", servlet, path);
    ServletHolder holder = new ServletHolder(servlet);
    context.addServlet(holder, path);
    holder.setInitOrder(0);
  }

  /**
   * Runs the server and waits for it to finish.
   *
   * @throws Exception if the server could not be successfully started.
   */
  private void runBlocking()
      throws Exception {
    try {
      start();
      server.join();
    } finally {
      server.destroy();
      log.info("Server terminated.");
    }
  }

  /**
   * Starts the server asynchronously. To stop the server, see {@link #stop()}.
   *
   * @throws Exception if the server could not be successfully started.
   */
  public void start()
      throws Exception {
    metricsReporters.forEach((reporterName, metricsReporter) -> {
      log.info("Registering the metrics reporter : {},  with source :  {}.", reporterName, METRICS_SOURCE);
      metricsReporter.register(METRICS_SOURCE, metricsRegistry);
      log.info("Starting the metrics reporter : {}.", reporterName);
      metricsReporter.start();
    });
    log.info("Starting server on port {}", ((NetworkConnector) server.getConnectors()[0]).getPort());
    server.start();
    log.info("Server is running");
  }

  /**
   * Stops the server.
   *
   * @throws Exception if the server could not be successfully stopped.
   */
  public void stop()
      throws Exception {
    metricsReporters.forEach((reporterName, metricsReporter)  -> {
      log.info("Stopping the metrics reporter : {}.", reporterName);
      metricsReporter.stop();
    });
    log.info("Stopping server");
    server.stop();
    log.info("Server is stopped");
  }
}
