/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.iotdb.db.audit;

import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.protocol.session.ClientSession;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.validation.constraints.NotNull;

import java.time.ZoneId;
import java.util.List;

import static org.apache.iotdb.db.pipe.receiver.legacy.loader.ILoader.SCHEMA_FETCHER;

public class AuditLogger {
  private static final Logger logger = LoggerFactory.getLogger(AuditLogger.class);
  private static final Logger AUDIT_LOGGER =
      LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);

  private static final String LOG = "log";
  private static final String USERNAME = "username";
  private static final String ADDRESS = "address";
  private static final String AUDIT_LOG_DEVICE = "root.__system.audit._%s";
  private static final Coordinator COORDINATOR = Coordinator.getInstance();
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
  private static final List<AuditLogStorage> auditLogStorageList = config.getAuditLogStorage();
  private static final SessionInfo sessionInfo =
      new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault());

  private static final List<AuditLogOperation> auditLogOperationList =
      config.getAuditLogOperation();

  private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();

  private static final DataNodeDevicePathCache DEVICE_PATH_CACHE =
      DataNodeDevicePathCache.getInstance();

  private AuditLogger() {
    // Empty constructor
  }

  @NotNull
  private static InsertRowStatement generateInsertStatement(
      String log, String address, String username)
      throws IoTDBConnectionException, IllegalPathException, QueryProcessException {
    InsertRowStatement insertStatement = new InsertRowStatement();
    insertStatement.setDevicePath(
        DEVICE_PATH_CACHE.getPartialPath(String.format(AUDIT_LOG_DEVICE, username)));
    insertStatement.setTime(CommonDateTimeUtils.currentTime());
    insertStatement.setMeasurements(new String[] {LOG, USERNAME, ADDRESS});
    insertStatement.setAligned(false);
    insertStatement.setValues(
        new Object[] {
          new Binary(log, TSFileConfig.STRING_CHARSET),
          new Binary(username, TSFileConfig.STRING_CHARSET),
          new Binary(address, TSFileConfig.STRING_CHARSET)
        });
    insertStatement.setDataTypes(
        new TSDataType[] {TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT});
    return insertStatement;
  }

  public static void log(String log, Statement statement) {
    AuditLogOperation operation = judgeLogOperation(statement.getType());
    IClientSession currSession = SessionManager.getInstance().getCurrSession();
    String username = "";
    String address = "";
    if (currSession != null) {
      ClientSession clientSession = (ClientSession) currSession;
      String clientAddress = clientSession.getClientAddress();
      int clientPort = ((ClientSession) currSession).getClientPort();
      address = String.format("%s:%s", clientAddress, clientPort);
      username = currSession.getUsername();
    }

    if (auditLogOperationList.contains(operation)) {
      if (auditLogStorageList.contains(AuditLogStorage.IOTDB)) {
        try {
          COORDINATOR.executeForTreeModel(
              generateInsertStatement(log, address, username),
              SESSION_MANAGER.requestQueryId(),
              sessionInfo,
              "",
              ClusterPartitionFetcher.getInstance(),
              SCHEMA_FETCHER);
        } catch (IllegalPathException | IoTDBConnectionException | QueryProcessException e) {
          logger.error("write audit log series error,", e);
        }
      }
      if (auditLogStorageList.contains(AuditLogStorage.LOGGER)) {
        AUDIT_LOGGER.info("user:{},address:{},log:{}", username, address, log);
      }
    }
  }

  public static void log(String log, Statement statement, boolean isNativeApi) {
    if (isNativeApi) {
      if (config.isEnableAuditLogForNativeInsertApi()) {
        log(log, statement);
      }
    } else {
      log(log, statement);
    }
  }

  private static AuditLogOperation judgeLogOperation(StatementType type) {
    switch (type) {
      case AUTHOR:
      case CREATE_USER:
      case DELETE_USER:
      case MODIFY_PASSWORD:
      case GRANT_USER_PRIVILEGE:
      case REVOKE_USER_PRIVILEGE:
      case GRANT_USER_ROLE:
      case REVOKE_USER_ROLE:
      case CREATE_ROLE:
      case DELETE_ROLE:
      case GRANT_ROLE_PRIVILEGE:
      case REVOKE_ROLE_PRIVILEGE:
      case GRANT_WATERMARK_EMBEDDING:
      case REVOKE_WATERMARK_EMBEDDING:
      case STORAGE_GROUP_SCHEMA:
      case DELETE_STORAGE_GROUP:
      case CREATE_TIME_SERIES:
      case CREATE_ALIGNED_TIME_SERIES:
      case CREATE_MULTI_TIME_SERIES:
      case DELETE_TIME_SERIES:
      case ALTER_TIME_SERIES:
      case CHANGE_ALIAS:
      case CHANGE_TAG_OFFSET:
      case CREATE_FUNCTION:
      case DROP_FUNCTION:
      case CREATE_INDEX:
      case DROP_INDEX:
      case QUERY_INDEX:
      case CREATE_TRIGGER:
      case DROP_TRIGGER:
      case CREATE_TEMPLATE:
      case SET_TEMPLATE:
      case MERGE:
      case FULL_MERGE:
      case MNODE:
      case MEASUREMENT_MNODE:
      case STORAGE_GROUP_MNODE:
      case AUTO_CREATE_DEVICE_MNODE:
      case TTL:
      case FLUSH:
      case CLEAR_CACHE:
      case DELETE_PARTITION:
      case LOAD_CONFIGURATION:
      case CREATE_SCHEMA_SNAPSHOT:
      case CREATE_CONTINUOUS_QUERY:
      case DROP_CONTINUOUS_QUERY:
      case SET_SYSTEM_MODE:
      case UNSET_TEMPLATE:
      case PRUNE_TEMPLATE:
      case APPEND_TEMPLATE:
      case DROP_TEMPLATE:
      case CREATE_PIPESINK:
      case DROP_PIPESINK:
      case CREATE_PIPE:
      case START_PIPE:
      case STOP_PIPE:
      case DROP_PIPE:
      case DEACTIVATE_TEMPLATE:
      case CREATE_PIPEPLUGIN:
      case DROP_PIPEPLUGIN:
      case CREATE_LOGICAL_VIEW:
      case ALTER_LOGICAL_VIEW:
      case DELETE_LOGICAL_VIEW:
      case RENAME_LOGICAL_VIEW:
      case CREATE_TOPIC:
      case DROP_TOPIC:
        return AuditLogOperation.DDL;
      case LOAD_DATA:
      case INSERT:
      case BATCH_INSERT:
      case BATCH_INSERT_ROWS:
      case BATCH_INSERT_ONE_DEVICE:
      case MULTI_BATCH_INSERT:
      case PIPE_ENRICHED:
      case DELETE:
      case SELECT_INTO:
      case LOAD_FILES:
      case REMOVE_FILE:
      case UNLOAD_FILE:
      case ACTIVATE_TEMPLATE:
      case SETTLE:
      case INTERNAL_CREATE_TIMESERIES:
      case START_REPAIR_DATA:
      case STOP_REPAIR_DATA:
        return AuditLogOperation.DML;
      case LIST_USER:
      case LIST_ROLE:
      case LIST_USER_PRIVILEGE:
      case LIST_ROLE_PRIVILEGE:
      case LIST_USER_ROLES:
      case LIST_ROLE_USERS:
      case QUERY:
      case LAST:
      case GROUP_BY_TIME:
      case GROUP_BY_FILL:
      case AGGREGATION:
      case FILL:
      case UDAF:
      case UDTF:
      case SHOW:
      case SHOW_PIPES:
      case SHOW_TOPICS:
      case SHOW_SUBSCRIPTIONS:
      case SHOW_MERGE_STATUS:
      case KILL:
      case TRACING:
      case SHOW_CONTINUOUS_QUERIES:
      case SHOW_SCHEMA_TEMPLATE:
      case SHOW_NODES_IN_SCHEMA_TEMPLATE:
      case SHOW_PATH_SET_SCHEMA_TEMPLATE:
      case SHOW_PATH_USING_SCHEMA_TEMPLATE:
      case SHOW_QUERY_RESOURCE:
      case FETCH_SCHEMA:
      case COUNT:
      case SHOW_TRIGGERS:
      case SHOW_PIPEPLUGINS:
        return AuditLogOperation.QUERY;
      default:
        logger.error("Unrecognizable operator type ({}) for audit log", type);
        return AuditLogOperation.NULL;
    }
  }
}
