// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use std::io::{self, Cursor, Error, ErrorKind, Read, Seek, SeekFrom, Write};
use std::sync::{Arc, Mutex};
use std::{cmp, fmt};

use crate::file::writer::TryClone;

/// This is object to use if your file is already in memory.
/// The sliceable cursor is similar to std::io::Cursor, except that it makes it easy to create "cursor slices".
/// To achieve this, it uses Arc instead of shared references. Indeed reference fields are painful
/// because the lack of Generic Associated Type implies that you would require complex lifetime propagation when
/// returning such a cursor.
#[allow(clippy::rc_buffer)]
pub struct SliceableCursor {
    inner: Arc<Vec<u8>>,
    start: u64,
    length: usize,
    pos: u64,
}

impl fmt::Debug for SliceableCursor {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("SliceableCursor")
            .field("start", &self.start)
            .field("length", &self.length)
            .field("pos", &self.pos)
            .field("inner.len", &self.inner.len())
            .finish()
    }
}

impl SliceableCursor {
    pub fn new(content: impl Into<Arc<Vec<u8>>>) -> Self {
        let inner = content.into();
        let size = inner.len();
        SliceableCursor {
            inner,
            start: 0,
            pos: 0,
            length: size,
        }
    }

    /// Create a slice cursor using the same data as a current one.
    pub fn slice(&self, start: u64, length: usize) -> io::Result<Self> {
        let new_start = self.start + start;
        if new_start >= self.inner.len() as u64
            || new_start as usize + length > self.inner.len()
        {
            return Err(Error::new(ErrorKind::InvalidInput, "out of bound"));
        }
        Ok(SliceableCursor {
            inner: Arc::clone(&self.inner),
            start: new_start,
            pos: new_start,
            length,
        })
    }

    fn remaining_slice(&self) -> &[u8] {
        let end = self.start as usize + self.length;
        let offset = cmp::min(self.pos, end as u64) as usize;
        &self.inner[offset..end]
    }

    /// Get the length of the current cursor slice
    pub fn len(&self) -> u64 {
        self.length as u64
    }

    /// return true if the cursor is empty (self.len() == 0)
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }
}

/// Implementation inspired by std::io::Cursor
impl Read for SliceableCursor {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let n = Read::read(&mut self.remaining_slice(), buf)?;
        self.pos += n as u64;
        Ok(n)
    }
}

impl Seek for SliceableCursor {
    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
        let new_pos = match pos {
            SeekFrom::Start(pos) => pos as i64,
            SeekFrom::End(pos) => self.inner.len() as i64 + pos as i64,
            SeekFrom::Current(pos) => self.pos as i64 + pos as i64,
        };

        if new_pos < 0 {
            Err(Error::new(
                ErrorKind::InvalidInput,
                format!(
                    "Request out of bounds: cur position {} + seek {:?} < 0: {}",
                    self.pos, pos, new_pos
                ),
            ))
        } else if new_pos >= self.inner.len() as i64 {
            Err(Error::new(
                ErrorKind::InvalidInput,
                format!(
                    "Request out of bounds: cur position {} + seek {:?} >= length {}: {}",
                    self.pos,
                    pos,
                    self.inner.len(),
                    new_pos
                ),
            ))
        } else {
            self.pos = new_pos as u64;
            Ok(self.start)
        }
    }
}

/// Use this type to write Parquet to memory rather than a file.
#[derive(Debug, Default, Clone)]
pub struct InMemoryWriteableCursor {
    buffer: Arc<Mutex<Cursor<Vec<u8>>>>,
}

impl InMemoryWriteableCursor {
    /// Consume this instance and return the underlying buffer as long as there are no other
    /// references to this instance.
    pub fn into_inner(self) -> Option<Vec<u8>> {
        Arc::try_unwrap(self.buffer)
            .ok()
            .and_then(|mutex| mutex.into_inner().ok())
            .map(|cursor| cursor.into_inner())
    }

    /// Returns a clone of the underlying buffer
    pub fn data(&self) -> Vec<u8> {
        let inner = self.buffer.lock().unwrap();
        inner.get_ref().to_vec()
    }
}

impl TryClone for InMemoryWriteableCursor {
    fn try_clone(&self) -> std::io::Result<Self> {
        Ok(Self {
            buffer: self.buffer.clone(),
        })
    }
}

impl Write for InMemoryWriteableCursor {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        let mut inner = self.buffer.lock().unwrap();
        inner.write(buf)
    }

    fn flush(&mut self) -> std::io::Result<()> {
        let mut inner = self.buffer.lock().unwrap();
        inner.flush()
    }
}

impl Seek for InMemoryWriteableCursor {
    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
        let mut inner = self.buffer.lock().unwrap();
        inner.seek(pos)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Create a SliceableCursor of all u8 values in ascending order
    fn get_u8_range() -> SliceableCursor {
        let data: Vec<u8> = (0u8..=255).collect();
        SliceableCursor::new(data)
    }

    /// Reads all the bytes in the slice and checks that it matches the u8 range from start to end_included
    fn check_read_all(mut cursor: SliceableCursor, start: u8, end_included: u8) {
        let mut target = vec![];
        let cursor_res = cursor.read_to_end(&mut target);
        println!("{:?}", cursor_res);
        assert!(!cursor_res.is_err(), "reading error");
        assert_eq!((end_included - start) as usize + 1, cursor_res.unwrap());
        assert_eq!((start..=end_included).collect::<Vec<_>>(), target);
    }

    #[test]
    fn read_all_whole() {
        let cursor = get_u8_range();
        check_read_all(cursor, 0, 255);
    }

    #[test]
    fn read_all_slice() {
        let cursor = get_u8_range().slice(10, 10).expect("error while slicing");
        check_read_all(cursor, 10, 19);
    }

    #[test]
    fn seek_cursor_start() {
        let mut cursor = get_u8_range();

        cursor.seek(SeekFrom::Start(5)).unwrap();
        check_read_all(cursor, 5, 255);
    }

    #[test]
    fn seek_cursor_current() {
        let mut cursor = get_u8_range();
        cursor.seek(SeekFrom::Start(10)).unwrap();
        cursor.seek(SeekFrom::Current(10)).unwrap();
        check_read_all(cursor, 20, 255);
    }

    #[test]
    fn seek_cursor_end() {
        let mut cursor = get_u8_range();

        cursor.seek(SeekFrom::End(-10)).unwrap();
        check_read_all(cursor, 246, 255);
    }

    #[test]
    fn seek_cursor_error_too_long() {
        let mut cursor = get_u8_range();
        let res = cursor.seek(SeekFrom::Start(1000));
        let actual_error = res.expect_err("expected error").to_string();
        let expected_error =
            "Request out of bounds: cur position 0 + seek Start(1000) >= length 256: 1000";
        assert_eq!(actual_error, expected_error);
    }

    #[test]
    fn seek_cursor_error_too_short() {
        let mut cursor = get_u8_range();
        let res = cursor.seek(SeekFrom::End(-1000));
        let actual_error = res.expect_err("expected error").to_string();
        let expected_error =
            "Request out of bounds: cur position 0 + seek End(-1000) < 0: -744";
        assert_eq!(actual_error, expected_error);
    }
}
