/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.dataflow.worker.logging;

import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.DEBUG;
import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.ERROR;
import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.INFO;
import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.OFF;
import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.TRACE;
import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.WARN;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;
import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;

/**
 * Sets up {@link java.util.logging} configuration on the Dataflow worker with a rotating file
 * logger. The file logger uses the {@link DataflowWorkerLoggingHandler} format. A user can override
 * the logging level by customizing the options found within {@link DataflowWorkerLoggingOptions}. A
 * user can override the location by specifying the Java system property
 * "dataflow.worker.logging.basepath" and the file size in MB before rolling over to a new file by
 * specifying the Java system property "dataflow.worker. loggging.filesize_mb". The default log
 * level is INFO, the default location is a file named dataflow-json.log within the system temporary
 * directory and the default file size is 1 GB.
 */
public class DataflowWorkerLoggingInitializer {
  private static final String ROOT_LOGGER_NAME = "";

  @VisibleForTesting
  static final String DEFAULT_RUNNER_LOGGING_LOCATION =
      new File(System.getProperty("java.io.tmpdir"), "dataflow-json.log").getPath();

  @VisibleForTesting
  static final String DEFAULT_SDK_LOGGING_LOCATION =
      new File(System.getProperty("java.io.tmpdir"), "sdk-json.log").getPath();

  @VisibleForTesting
  public static final String RUNNER_FILEPATH_PROPERTY = "dataflow.worker.logging.filepath";

  @VisibleForTesting
  static final String SDK_FILEPATH_PROPERTY = "dataflow.worker.logging.sdkfilepath";

  private static final String FILESIZE_MB_PROPERTY = "dataflow.worker.logging.filesize_mb";

  private static final String SYSTEM_OUT_LOG_NAME = "System.out";
  private static final String SYSTEM_ERR_LOG_NAME = "System.err";

  static final ImmutableBiMap<Level, DataflowWorkerLoggingOptions.Level> LEVELS =
      ImmutableBiMap.<Level, DataflowWorkerLoggingOptions.Level>builder()
          .put(Level.OFF, OFF)
          .put(Level.SEVERE, ERROR)
          .put(Level.WARNING, WARN)
          .put(Level.INFO, INFO)
          .put(Level.FINE, DEBUG)
          .put(Level.FINEST, TRACE)
          .build();

  /**
   * This default log level is overridden by the log level found at {@code
   * DataflowWorkerLoggingOptions#getDefaultWorkerLogLevel()}.
   */
  private static final DataflowWorkerLoggingOptions.Level DEFAULT_LOG_LEVEL =
      LEVELS.get(Level.INFO);

  /* We need to store a reference to the configured loggers so that they are not
   * garbage collected. java.util.logging only has weak references to the loggers
   * so if they are garbage collection, our hierarchical configuration will be lost. */
  @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
  private static List<Logger> configuredLoggers = Lists.newArrayList();

  private static DataflowWorkerLoggingHandler loggingHandler;
  private static DataflowWorkerLoggingHandler sdkLoggingHandler;
  private static PrintStream originalStdOut;
  private static PrintStream originalStdErr = System.err;
  private static boolean initialized = false;

  private static DataflowWorkerLoggingHandler makeLoggingHandler(
      String filepathProperty, String defaultFilePath) throws IOException {
    String filepath = System.getProperty(filepathProperty, defaultFilePath);
    int filesizeMb = Integer.parseInt(System.getProperty(FILESIZE_MB_PROPERTY, "1024"));
    DataflowWorkerLoggingHandler handler =
        new DataflowWorkerLoggingHandler(filepath, filesizeMb * 1024L * 1024L);
    handler.setLevel(Level.ALL);
    return handler;
  }

  public static PrintStream getOriginalStdErr() {
    return originalStdErr;
  }

  /** Sets up the initial logging configuration. */
  public static synchronized void initialize() {
    if (initialized) {
      return;
    }

    try {
      loggingHandler =
          makeLoggingHandler(RUNNER_FILEPATH_PROPERTY, DEFAULT_RUNNER_LOGGING_LOCATION);
      Logger rootLogger = LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME);

      Level logLevel = getJulLevel(DEFAULT_LOG_LEVEL);
      rootLogger.setLevel(logLevel);
      rootLogger.addHandler(loggingHandler);

      originalStdOut = System.out;
      originalStdErr = System.err;
      System.setOut(
          JulHandlerPrintStreamAdapterFactory.create(
              loggingHandler, SYSTEM_OUT_LOG_NAME, Level.INFO));
      System.setErr(
          JulHandlerPrintStreamAdapterFactory.create(
              loggingHandler, SYSTEM_ERR_LOG_NAME, Level.SEVERE));

      // Initialize the SDK Logging Handler, which will only be used for the LoggingService
      sdkLoggingHandler = makeLoggingHandler(SDK_FILEPATH_PROPERTY, DEFAULT_SDK_LOGGING_LOCATION);

      initialized = true;
    } catch (SecurityException | IOException | NumberFormatException e) {
      throw new ExceptionInInitializerError(e);
    }
  }

  /** Reconfigures logging with the passed in options. */
  public static synchronized void configure(DataflowWorkerLoggingOptions options) {
    if (!initialized) {
      throw new RuntimeException("configure() called before initialize()");
    }
    if (options.getDefaultWorkerLogLevel() != null) {
      Level defaultLevel = getJulLevel(options.getDefaultWorkerLogLevel());
      LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).setLevel(defaultLevel);
    }

    if (options.getWorkerLogLevelOverrides() != null) {
      for (Map.Entry<String, DataflowWorkerLoggingOptions.Level> loggerOverride :
          options.getWorkerLogLevelOverrides().entrySet()) {
        Logger logger = Logger.getLogger(loggerOverride.getKey());
        logger.setLevel(getJulLevel(loggerOverride.getValue()));
        configuredLoggers.add(logger);
      }
    }

    // If the options specify a level for messages logged to System.out/err, we need to reconfigure
    // the corresponding stream adapter.
    if (options.getWorkerSystemOutMessageLevel() != null) {
      System.out.close();
      System.setOut(
          JulHandlerPrintStreamAdapterFactory.create(
              loggingHandler,
              SYSTEM_OUT_LOG_NAME,
              getJulLevel(options.getWorkerSystemOutMessageLevel())));
    }

    if (options.getWorkerSystemErrMessageLevel() != null) {
      System.err.close();
      System.setErr(
          JulHandlerPrintStreamAdapterFactory.create(
              loggingHandler,
              SYSTEM_ERR_LOG_NAME,
              getJulLevel(options.getWorkerSystemErrMessageLevel())));
    }
  }

  /**
   * Returns the underlying {@link DataflowWorkerLoggingHandler}.
   *
   * <p>Generally, code should just use logging interface.
   */
  public static DataflowWorkerLoggingHandler getLoggingHandler() {
    if (!initialized) {
      throw new RuntimeException("getLoggingHandler() called before initialize()");
    }
    return loggingHandler;
  }

  /**
   * Returns the underlying {@link DataflowWorkerLoggingHandler} for logs from the SDK.
   *
   * <p>Initializes Dataflow worker logging if not initialized already.
   */
  public static DataflowWorkerLoggingHandler getSdkLoggingHandler() {
    if (!initialized) {
      throw new RuntimeException("getSdkLoggingHandler() called before initialize()");
    }
    return sdkLoggingHandler;
  }

  private static Level getJulLevel(DataflowWorkerLoggingOptions.Level level) {
    return LEVELS.inverse().get(level);
  }

  @VisibleForTesting
  public static synchronized void reset() {
    if (!initialized) {
      return;
    }
    configuredLoggers = Lists.newArrayList();
    System.setOut(originalStdOut);
    System.setErr(originalStdErr);
    JulHandlerPrintStreamAdapterFactory.reset();
    initialized = false;
  }
}
