// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

//! Grpc implementation of [Report].

use super::{CollectItemConsume, CollectItemProduce};
#[cfg(feature = "management")]
use crate::proto::v3::management_service_client::ManagementServiceClient;
use crate::{
    proto::v3::{
        log_report_service_client::LogReportServiceClient,
        meter_report_service_client::MeterReportServiceClient,
        trace_segment_report_service_client::TraceSegmentReportServiceClient, LogData, MeterData,
        SegmentObject,
    },
    reporter::{CollectItem, Report},
};
use futures_core::Stream;
use futures_util::future::{try_join_all, TryJoinAll};
use std::{
    error::Error,
    future::{pending, Future},
    pin::Pin,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    task::{Context, Poll},
    time::Duration,
};
use tokio::{
    select,
    sync::{
        mpsc::{self, Receiver, Sender},
        Mutex,
    },
    task::JoinHandle,
    try_join,
};
use tokio_stream::StreamExt;
use tonic::{
    metadata::{Ascii, MetadataValue},
    service::{interceptor::InterceptedService, Interceptor},
    transport::{self, Channel, Endpoint},
    Request, Status,
};
use tracing::error;

type DynInterceptHandler = dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync;
type DynErrHandler = dyn Fn(&str, &dyn Error) + Send + Sync + 'static;
type DynStatusHandler = dyn Fn(&str, &Status) + Send + Sync + 'static;

fn default_err_handle(message: &str, err: &dyn Error) {
    error!(?err, "{}", message);
}

fn default_status_handle(message: &str, status: &Status) {
    error!(?status, "{}", message);
}

#[derive(Default, Clone)]
struct CustomInterceptor {
    authentication: Option<Arc<String>>,
    custom_intercept: Option<Arc<DynInterceptHandler>>,
}

impl Interceptor for CustomInterceptor {
    fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
        if let Some(authentication) = &self.authentication {
            if let Ok(authentication) = authentication.parse::<MetadataValue<Ascii>>() {
                request
                    .metadata_mut()
                    .insert("authentication", authentication);
            }
        }
        if let Some(custom_intercept) = &self.custom_intercept {
            request = custom_intercept(request)?;
        }
        Ok(request)
    }
}

struct State {
    is_reporting: AtomicBool,
    is_closing: AtomicBool,
}

impl State {
    fn is_closing(&self) -> bool {
        self.is_closing.load(Ordering::Relaxed)
    }
}

/// Reporter which will report to Skywalking OAP server via grpc protocol.
pub struct GrpcReporter<P, C> {
    state: Arc<State>,
    producer: Arc<P>,
    consumer: Arc<Mutex<Option<C>>>,
    err_handle: Arc<DynErrHandler>,
    channel: Channel,
    interceptor: CustomInterceptor,
}

impl GrpcReporter<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedReceiver<CollectItem>> {
    /// New with exists [Channel], so you can clone the [Channel] for multiplex.
    pub fn new(channel: Channel) -> Self {
        let (p, c) = mpsc::unbounded_channel();
        Self::new_with_pc(channel, p, c)
    }

    /// Connect to the Skywalking OAP server.
    pub async fn connect(
        address: impl TryInto<Endpoint, Error = transport::Error>,
    ) -> crate::Result<Self> {
        let endpoint = address.try_into()?;
        let channel = endpoint.connect().await?;
        Ok(Self::new(channel))
    }
}

impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
    /// Special purpose, used for user-defined produce and consume operations,
    /// usually you can use [GrpcReporter::connect] and [GrpcReporter::new].
    pub fn new_with_pc(channel: Channel, producer: P, consumer: C) -> Self {
        Self {
            state: Arc::new(State {
                is_reporting: Default::default(),
                is_closing: Default::default(),
            }),
            producer: Arc::new(producer),
            consumer: Arc::new(Mutex::new(Some(consumer))),
            err_handle: Arc::new(default_err_handle),
            channel,
            interceptor: Default::default(),
        }
    }

    /// Set error handle. By default, the error will not be handle.
    pub fn with_err_handle(
        mut self,
        handle: impl Fn(&str, &dyn Error) + Send + Sync + 'static,
    ) -> Self {
        self.err_handle = Arc::new(handle);
        self
    }

    /// Set the authentication header value. By default, the authentication is
    /// not set.
    pub fn with_authentication(mut self, authentication: impl Into<String>) -> Self {
        self.interceptor.authentication = Some(Arc::new(authentication.into()));
        self
    }

    /// Set the custom intercept. By default, the custom intercept is not set.
    pub fn with_custom_intercept(
        mut self,
        custom_intercept: impl Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static,
    ) -> Self {
        self.interceptor.custom_intercept = Some(Arc::new(custom_intercept));
        self
    }

    /// Start to reporting.
    ///
    /// # Panics
    ///
    /// Panic if call more than once.
    pub async fn reporting(&self) -> Reporting<C> {
        if self.state.is_reporting.swap(true, Ordering::Relaxed) {
            panic!("reporting already called");
        }

        let (trace_sender, trace_receiver) = mpsc::channel(255);
        let (log_sender, log_receiver) = mpsc::channel(255);
        let (meter_sender, meter_receiver) = mpsc::channel(255);

        let status_handle = Arc::new(default_status_handle);

        Reporting {
            report_sender: ReportSender {
                state: Arc::clone(&self.state),
                inner_report_sender: InnerReportSender {
                    status_handle: Arc::new(default_status_handle),
                    err_handle: self.err_handle.clone(),
                    trace_sender,
                    log_sender,
                    meter_sender,

                    #[cfg(feature = "management")]
                    management_client: ManagementServiceClient::with_interceptor(
                        self.channel.clone(),
                        self.interceptor.clone(),
                    ),
                },
                shutdown_signal: Box::pin(pending()),
                consumer: self.consumer.lock().await.take().unwrap(),
            },

            trace_receive_reporter: TraceReceiveReporter {
                trace_client: TraceSegmentReportServiceClient::with_interceptor(
                    self.channel.clone(),
                    self.interceptor.clone(),
                ),
                trace_receiver,
                status_handle: status_handle.clone(),
            },

            log_receive_reporter: LogReceiveReporter {
                log_client: LogReportServiceClient::with_interceptor(
                    self.channel.clone(),
                    self.interceptor.clone(),
                ),
                log_receiver,
                status_handle: status_handle.clone(),
            },

            meter_receive_reporter: MeterReceiveReporter {
                meter_client: MeterReportServiceClient::with_interceptor(
                    self.channel.clone(),
                    self.interceptor.clone(),
                ),
                meter_receiver,
                status_handle,
            },
        }
    }
}

impl<P, C> Clone for GrpcReporter<P, C> {
    fn clone(&self) -> Self {
        Self {
            state: self.state.clone(),
            producer: self.producer.clone(),
            consumer: self.consumer.clone(),
            err_handle: self.err_handle.clone(),
            channel: self.channel.clone(),
            interceptor: self.interceptor.clone(),
        }
    }
}

impl<P: CollectItemProduce, C: CollectItemConsume> Report for GrpcReporter<P, C> {
    fn report(&self, item: CollectItem) {
        if !self.state.is_closing() {
            if let Err(e) = self.producer.produce(item) {
                (self.err_handle)("report collect item failed", &*e);
            }
        }
    }
}

struct InnerReportSender {
    status_handle: Arc<DynStatusHandler>,
    err_handle: Arc<DynErrHandler>,

    trace_sender: Sender<SegmentObject>,
    log_sender: Sender<LogData>,
    meter_sender: Sender<MeterData>,

    #[cfg(feature = "management")]
    management_client: ManagementServiceClient<InterceptedService<Channel, CustomInterceptor>>,
}

impl InnerReportSender {
    async fn report(&mut self, item: CollectItem) {
        match item {
            CollectItem::Trace(item) => {
                if let Err(e) = self.trace_sender.try_send(*item) {
                    (self.err_handle)("report trace segment failed", &e as &dyn Error);
                }
            }
            CollectItem::Log(item) => {
                if let Err(e) = self.log_sender.try_send(*item) {
                    (self.err_handle)("report log data failed", &e as &dyn Error);
                }
            }
            CollectItem::Meter(item) => {
                if let Err(e) = self.meter_sender.try_send(*item) {
                    (self.err_handle)("report meter data failed", &e as &dyn Error);
                }
            }
            #[cfg(feature = "management")]
            CollectItem::Instance(item) => {
                if let Err(e) = self
                    .management_client
                    .report_instance_properties(*item)
                    .await
                {
                    (self.status_handle)("Report instance properties failed", &e);
                }
            }
            #[cfg(feature = "management")]
            CollectItem::Ping(item) => {
                if let Err(e) = self.management_client.keep_alive(*item).await {
                    (self.status_handle)("Ping failed", &e);
                }
            }
        }
    }
}

struct ReportSender<C> {
    state: Arc<State>,
    inner_report_sender: InnerReportSender,
    consumer: C,
    shutdown_signal: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
}

impl<C: CollectItemConsume> ReportSender<C> {
    async fn start(self) -> crate::Result<()> {
        let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
        let ReportSender {
            state,
            mut inner_report_sender,
            consumer: mut collect_item_consumer,
            shutdown_signal,
            ..
        } = self;

        let work_fut = async move {
            loop {
                select! {
                    item = collect_item_consumer.consume() => {
                        match item {
                            Ok(Some(item)) => {
                                inner_report_sender.report(item).await;
                            }
                            Ok(None) => break,
                            Err(err) => return Err(crate::Error::Other(err)),
                        }
                    }
                    _ =  shutdown_rx.recv() => break,
                }
            }

            state.is_closing.store(true, Ordering::Relaxed);

            // Flush.
            loop {
                match collect_item_consumer.try_consume().await {
                    Ok(Some(item)) => {
                        inner_report_sender.report(item).await;
                    }
                    Ok(None) => break,
                    Err(err) => return Err(err.into()),
                }
            }

            Ok::<_, crate::Error>(())
        };

        let shutdown_fut = async move {
            shutdown_signal.await;
            shutdown_tx
                .send(())
                .map_err(|e| crate::Error::Other(Box::new(e)))?;
            Ok(())
        };

        try_join!(work_fut, shutdown_fut)?;

        Ok(())
    }
}

/// Handle of [GrpcReporter::reporting].
pub struct Reporting<C> {
    report_sender: ReportSender<C>,
    trace_receive_reporter: TraceReceiveReporter,
    log_receive_reporter: LogReceiveReporter,
    meter_receive_reporter: MeterReceiveReporter,
}

impl<C: CollectItemConsume> Reporting<C> {
    /// Quit when shutdown_signal received.
    ///
    /// Accept a `shutdown_signal` argument as a graceful shutdown signal.
    pub fn with_graceful_shutdown(
        mut self,
        shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
    ) -> Self {
        self.report_sender.shutdown_signal = Box::pin(shutdown_signal);
        self
    }

    /// Set the failed status handle. By default, the status will not be handle.
    pub fn with_status_handle(
        mut self,
        handle: impl Fn(&str, &Status) + Send + Sync + 'static,
    ) -> Self {
        let handle = Arc::new(handle);
        self.report_sender.inner_report_sender.status_handle = handle.clone();
        self.trace_receive_reporter.status_handle = handle.clone();
        self.log_receive_reporter.status_handle = handle.clone();
        self.meter_receive_reporter.status_handle = handle;
        self
    }

    /// Spawn the reporting in background.
    pub fn spawn(self) -> ReportingJoinHandle {
        ReportingJoinHandle {
            handles: try_join_all(vec![
                tokio::spawn(self.report_sender.start()),
                tokio::spawn(self.trace_receive_reporter.start()),
                tokio::spawn(self.log_receive_reporter.start()),
                tokio::spawn(self.meter_receive_reporter.start()),
            ]),
        }
    }
}

/// Handle of [Reporting::spawn].
pub struct ReportingJoinHandle {
    handles: TryJoinAll<JoinHandle<crate::Result<()>>>,
}

impl Future for ReportingJoinHandle {
    type Output = crate::Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Pin::new(&mut self.handles).poll(cx).map(|rs| {
            let rs = rs?;
            for r in rs {
                r?;
            }
            Ok(())
        })
    }
}

struct TraceReceiveReporter {
    trace_client: TraceSegmentReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
    trace_receiver: Receiver<SegmentObject>,
    status_handle: Arc<DynStatusHandler>,
}

impl TraceReceiveReporter {
    async fn start(mut self) -> crate::Result<()> {
        let rf = ReceiveFrom::new(self.trace_receiver);
        while let Some(stream) = rf.stream() {
            if let Err(err) = self.trace_client.collect(stream).await {
                (self.status_handle)("Collect trace segment by stream failed", &err);
            }
        }
        Ok(())
    }
}

struct LogReceiveReporter {
    log_client: LogReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
    log_receiver: Receiver<LogData>,
    status_handle: Arc<DynStatusHandler>,
}

impl LogReceiveReporter {
    async fn start(mut self) -> crate::Result<()> {
        let rf = ReceiveFrom::new(self.log_receiver);
        while let Some(stream) = rf.stream() {
            if let Err(err) = self.log_client.collect(stream).await {
                (self.status_handle)("Collect log data by stream failed", &err);
            }
        }
        Ok(())
    }
}

struct MeterReceiveReporter {
    meter_client: MeterReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
    meter_receiver: Receiver<MeterData>,
    status_handle: Arc<DynStatusHandler>,
}

impl MeterReceiveReporter {
    async fn start(mut self) -> crate::Result<()> {
        let rf = ReceiveFrom::new(self.meter_receiver);
        while let Some(stream) = rf.stream() {
            if let Err(err) = self.meter_client.collect(stream).await {
                (self.status_handle)("Collect meter data by stream failed", &err);
            }
        }
        Ok(())
    }
}

struct ReceiveFrom<I> {
    receiver: Arc<Mutex<Receiver<I>>>,
    is_closed: Arc<AtomicBool>,
}

impl<I> ReceiveFrom<I> {
    fn new(receiver: Receiver<I>) -> Self {
        Self {
            receiver: Arc::new(Mutex::new(receiver)),
            is_closed: Default::default(),
        }
    }

    fn stream(&self) -> Option<impl Stream<Item = I>> {
        if self.is_closed.load(Ordering::Relaxed) {
            return None;
        }

        let is_closed = self.is_closed.clone();
        let receiver = self.receiver.clone();

        Some(
            ReceiveFromStream {
                receiver,
                is_closed,
            }
            .timeout(Duration::from_secs(30))
            .map_while(|item| item.ok()),
        )
    }
}

struct ReceiveFromStream<I> {
    receiver: Arc<Mutex<Receiver<I>>>,
    is_closed: Arc<AtomicBool>,
}

impl<I> Stream for ReceiveFromStream<I> {
    type Item = I;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.receiver.try_lock().unwrap().poll_recv(cx).map(|item| {
            if item.is_none() {
                self.is_closed.store(true, Ordering::Relaxed);
            }
            item
        })
    }
}
