/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.samza.logging.log4j2.serializers;

import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.util.Util;

/**
 * A JSON serde that serializes Log4J2 LogEvent objects into JSON using the
 * standard logstash LogEvent format defined <a
 * href="https://github.com/logstash/log4j-jsonevent-layout">here</a>.
 */
public class LoggingEventJsonSerde implements Serde<LogEvent> {
  /**
   * The JSON format version.
   */
  public static final int VERSION = 1;

  /**
   * The date format to use for the timestamp field.
   */
  public static final Format DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");

  // Have to wrap rather than extend due to type collisions between
  // Serde<LoggingEvent> and Serde<Object>.
  @SuppressWarnings("rawtypes")
  private final JsonSerde jsonSerde;

  /**
   * Defines whether to include LocationInfo data in the serialized
   * LoggingEvent. This information includes the file, line, and class that
   * wrote the log line.
   */
  private final boolean includeLocationInfo;

  /**
   * Constructs the serde without location info.
   */
  public LoggingEventJsonSerde() {
    this(false);
  }

  /**
   * Constructs the serde.
   *
   * @param includeLocationInfo
   *          Whether to include location info in the logging event or not.
   */
  @SuppressWarnings("rawtypes")
  public LoggingEventJsonSerde(boolean includeLocationInfo) {
    this.includeLocationInfo = includeLocationInfo;
    this.jsonSerde = new JsonSerde();
  }

  @SuppressWarnings("unchecked")
  @Override
  public byte[] toBytes(LogEvent loggingEvent) {
    Map<String, Object> loggingEventMap = encodeToMap(loggingEvent, includeLocationInfo);
    return jsonSerde.toBytes(loggingEventMap);
  }

  @SuppressWarnings("unchecked")
  @Override
  public LogEvent fromBytes(byte[] loggingEventMapBytes) {
    Map<String, Object> loggingEventMap = (Map<String, Object>) jsonSerde.fromBytes(loggingEventMapBytes);
    return decodeFromMap(loggingEventMap);
  }

  /**
   * Encodes a LoggingEvent into a HashMap using the logstash JSON format.
   *
   * @param loggingEvent
   *          The LoggingEvent to encode.
   * @param includeLocationInfo
   *          Whether to include LocationInfo in the map, or not.
   * @return A Map representing the LoggingEvent, which is suitable to be
   *         serialized by a JSON encoder such as Jackson.
   */
  @SuppressWarnings("rawtypes")
  public static Map<String, Object> encodeToMap(LogEvent loggingEvent, boolean includeLocationInfo) {
    Map<String, Object> logstashEvent = new LoggingEventJsonSerde.LoggingEventMap();
    String threadName = loggingEvent.getThreadName();
    long timestamp = loggingEvent.getTimeMillis();
    HashMap<String, Object> exceptionInformation = new HashMap<String, Object>();
    Map mdc = loggingEvent.getContextData().toMap();
    ThreadContext.ContextStack ndc = loggingEvent.getContextStack();

    logstashEvent.put("@version", VERSION);
    logstashEvent.put("@timestamp", dateFormat(timestamp));
    logstashEvent.put("source_host", getHostname());
    logstashEvent.put("message", loggingEvent.getMessage().getFormattedMessage());

    if (loggingEvent.getThrown() != null) {
      final Throwable throwableInformation = loggingEvent.getThrown();
      if (throwableInformation.getClass().getCanonicalName() != null) {
        exceptionInformation.put("exception_class", throwableInformation.getClass().getCanonicalName());
      }
      if (throwableInformation.getMessage() != null) {
        exceptionInformation.put("exception_message", throwableInformation.getMessage());
      }
      if (throwableInformation.getMessage() != null) {
        StringBuilder stackTrace = new StringBuilder(ExceptionUtils.getStackTrace(throwableInformation));
        exceptionInformation.put("stacktrace", stackTrace);
      }
      logstashEvent.put("exception", exceptionInformation);
    }

    if (includeLocationInfo) {
      StackTraceElement info = loggingEvent.getSource();
      logstashEvent.put("file", info.getFileName());
      logstashEvent.put("line_number", info.getLineNumber());
      logstashEvent.put("class", info.getClassName());
      logstashEvent.put("method", info.getMethodName());
    }

    logstashEvent.put("logger_name", loggingEvent.getLoggerName());
    logstashEvent.put("mdc", mdc);
    logstashEvent.put("ndc", ndc);
    logstashEvent.put("level", loggingEvent.getLevel().toString());
    logstashEvent.put("thread_name", threadName);

    return logstashEvent;
  }

  /**
   * This method is not currently implemented.
   *
   * @param loggingEventMap a map of logging events
   *
   * @return {@link LogEvent} decoded from the given logging event map.<br>
   *     Currently it throws an {@link UnsupportedOperationException} as the method is not implemented yet!
   */
  public static LogEvent decodeFromMap(Map<String, Object> loggingEventMap) {
    throw new UnsupportedOperationException("Unable to decode LoggingEvents.");
  }

  public static String dateFormat(long time) {
    return DATE_FORMAT.format(new Date(time));
  }

  /**
   * @return The hostname to use in the hostname field of the encoded
   *         LoggingEvents.
   */
  public static String getHostname() {
    try {
      return Util.getLocalHost().getHostName();
    } catch (Exception e) {
      return "unknown-host";
    }
  }

  /**
   * A helper class that only puts non-null values into the encoded LoggingEvent
   * map. This helps to shrink over-the-wire byte payloads for encoded
   * LoggingEvents.
   */
  @SuppressWarnings("serial")
  public static final class LoggingEventMap extends HashMap<String, Object> {
    public Object put(String key, Object value) {
      if (value == null) {
        return get(key);
      } else {
        return super.put(key, value);
      }
    }
  }
}
