/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import groovy.lang.Closure;
import groovy.lang.DelegatesTo;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.List;

import lombok.experimental.Delegate;
import org.apache.commons.lang3.StringUtils;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.TraceContext;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor.slowsql.SlowSqlSpec;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamilyBuilder;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.DatabaseSlowStatementBuilder;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.DatabaseSlowStatement;
import org.apache.skywalking.oap.server.core.source.ServiceMeta;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Objects.nonNull;
import static org.apache.skywalking.oap.server.library.util.StringUtil.isNotBlank;

public class ExtractorSpec extends AbstractSpec {
    private static final Logger LOGGER = LoggerFactory.getLogger(SlowSqlSpec.class);

    private final List<MetricConvert> metricConverts;

    private final SlowSqlSpec slowSql;

    private final NamingControl namingControl;

    private final SourceReceiver sourceReceiver;

    private static final DateTimeFormatter DTF = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");

    public ExtractorSpec(final ModuleManager moduleManager,
                         final LogAnalyzerModuleConfig moduleConfig) throws ModuleStartException {
        super(moduleManager, moduleConfig);

        final MeterSystem meterSystem =
            moduleManager.find(CoreModule.NAME).provider().getService(MeterSystem.class);

        metricConverts = moduleConfig.malConfigs()
                                     .stream()
                                     .map(it -> new MetricConvert(it, meterSystem))
                                     .collect(Collectors.toList());

        slowSql = new SlowSqlSpec(moduleManager(), moduleConfig());

        namingControl = moduleManager.find(CoreModule.NAME)
                .provider()
                .getService(NamingControl.class);

        sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
    }

    @SuppressWarnings("unused")
    public void service(final String service) {
        if (BINDING.get().shouldAbort()) {
            return;
        }
        if (nonNull(service)) {
            BINDING.get().log().setService(service);
        }
    }

    @SuppressWarnings("unused")
    public void instance(final String instance) {
        if (BINDING.get().shouldAbort()) {
            return;
        }
        if (nonNull(instance)) {
            BINDING.get().log().setServiceInstance(instance);
        }
    }

    @SuppressWarnings("unused")
    public void endpoint(final String endpoint) {
        if (BINDING.get().shouldAbort()) {
            return;
        }
        if (nonNull(endpoint)) {
            BINDING.get().log().setEndpoint(endpoint);
        }
    }

    @SuppressWarnings("unused")
    public void tag(final Map<String, ?> kv) {
        if (BINDING.get().shouldAbort()) {
            return;
        }
        if (CollectionUtils.isEmpty(kv)) {
            return;
        }
        final LogData.Builder logData = BINDING.get().log();
        logData.setTags(
            logData.getTags()
                   .toBuilder()
                   .addAllData(
                       kv.entrySet()
                         .stream()
                         .filter(it -> isNotBlank(it.getKey()))
                         .filter(it -> nonNull(it.getValue()) &&
                             isNotBlank(Objects.toString(it.getValue())))
                         .map(it -> {
                             final Object val = it.getValue();
                             String valStr = Objects.toString(val);
                             if (Collection.class.isAssignableFrom(val.getClass())) {
                                 valStr = Joiner.on(",").skipNulls().join((Collection<?>) val);
                             }
                             return KeyStringValuePair.newBuilder()
                                                      .setKey(it.getKey())
                                                      .setValue(valStr)
                                                      .build();
                         })
                         .collect(Collectors.toList())
                   )
        );
        BINDING.get().log(logData);
    }

    @SuppressWarnings("unused")
    public void traceId(final String traceId) {
        if (BINDING.get().shouldAbort()) {
            return;
        }
        if (nonNull(traceId)) {
            final LogData.Builder logData = BINDING.get().log();
            final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
            traceContext.setTraceId(traceId);
            logData.setTraceContext(traceContext);
        }
    }

    @SuppressWarnings("unused")
    public void segmentId(final String segmentId) {
        if (BINDING.get().shouldAbort()) {
            return;
        }
        if (nonNull(segmentId)) {
            final LogData.Builder logData = BINDING.get().log();
            final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
            traceContext.setTraceSegmentId(segmentId);
            logData.setTraceContext(traceContext);
        }
    }

    @SuppressWarnings("unused")
    public void spanId(final String spanId) {
        if (BINDING.get().shouldAbort()) {
            return;
        }
        if (nonNull(spanId)) {
            final LogData.Builder logData = BINDING.get().log();
            final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
            traceContext.setSpanId(Integer.parseInt(spanId));
            logData.setTraceContext(traceContext);
        }
    }

    @SuppressWarnings("unused")
    public void timestamp(final String timestamp) {
        if (BINDING.get().shouldAbort()) {
            return;
        }
        if (nonNull(timestamp) && StringUtils.isNumeric(timestamp)) {
            BINDING.get().log().setTimestamp(Long.parseLong(timestamp));
        }
    }

    @SuppressWarnings("unused")
    public void layer(final String layer) {
        if (BINDING.get().shouldAbort()) {
            return;
        }
        if (nonNull(layer)) {
            final LogData.Builder logData = BINDING.get().log();
            logData.setLayer(layer);
        }
    }

    @SuppressWarnings("unused")
    public void metrics(@DelegatesTo(SampleBuilder.class) final Closure<?> cl) {
        if (BINDING.get().shouldAbort()) {
            return;
        }
        final SampleBuilder builder = new SampleBuilder();
        cl.setDelegate(builder);
        cl.call();

        final Sample sample = builder.build();
        final SampleFamily sampleFamily = SampleFamilyBuilder.newBuilder(sample).build();

        final Optional<List<SampleFamily>> possibleMetricsContainer = BINDING.get().metricsContainer();

        if (possibleMetricsContainer.isPresent()) {
            possibleMetricsContainer.get().add(sampleFamily);
        } else {
            metricConverts.forEach(it -> it.toMeter(
                    ImmutableMap.<String, SampleFamily>builder()
                            .put(sample.getName(), sampleFamily)
                            .build()
            ));
        }
    }

    @SuppressWarnings("unused")
    public void slowSql(@DelegatesTo(SlowSqlSpec.class) final Closure<?> cl) {
        if (BINDING.get().shouldAbort()) {
            return;
        }
        LogData.Builder log = BINDING.get().log();
        if (log.getLayer() == null
                || log.getService() == null
                || log.getTimestamp() < 1) {
            LOGGER.warn("SlowSql extracts failed, maybe something is not configured.");
            return;
        }
        DatabaseSlowStatementBuilder builder = new DatabaseSlowStatementBuilder(namingControl);
        builder.setLayer(Layer.nameOf(log.getLayer()));

        LocalDateTime localDateTime = Instant.ofEpochSecond(log.getTimestamp()).atZone(ZoneId.systemDefault()).toLocalDateTime();
        String timeBucket = DTF.format(localDateTime);
        builder.setTimeBucket(Long.parseLong(timeBucket));
        builder.setServiceName(log.getService());

        ServiceMeta serviceMeta = new ServiceMeta();
        serviceMeta.setName(namingControl.formatServiceName(log.getService()));
        serviceMeta.setLayer(builder.getLayer());
        serviceMeta.setTimeBucket(builder.getTimeBucket());
        BINDING.get().databaseSlowStatement(builder);

        cl.setDelegate(slowSql);
        cl.call();

        if (builder.getId() == null
                || builder.getLatency() < 1
                || builder.getStatement() == null) {
            LOGGER.warn("SlowSql extracts failed, maybe something is not configured.");
            return;
        }

        String entityId = serviceMeta.getEntityId();
        builder.prepare();
        DatabaseSlowStatement databaseSlowStatement = builder.toDatabaseSlowStatement();
        databaseSlowStatement.setDatabaseServiceId(entityId);

        sourceReceiver.receive(databaseSlowStatement);
        sourceReceiver.receive(serviceMeta);
    }

    public static class SampleBuilder {
        @Delegate
        private final Sample.SampleBuilder sampleBuilder = Sample.builder();

        @SuppressWarnings("unused")
        public Sample.SampleBuilder labels(final Map<String, ?> labels) {
            final Map<String, String> filtered =
                labels.entrySet()
                      .stream()
                      .filter(it -> isNotBlank(it.getKey()) && nonNull(it.getValue()))
                      .collect(
                          Collectors.toMap(Map.Entry::getKey,
                                           it -> Objects.toString(it.getValue()))
                      );
            return sampleBuilder.labels(ImmutableMap.copyOf(filtered));
        }
    }
}
