# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
# isort:skip_file
from datetime import datetime, timedelta
from unittest.mock import Mock, patch, PropertyMock

from flask_babel import gettext as __
import pytest
from selenium.common.exceptions import WebDriverException
from slack import errors, WebClient

from tests.test_app import app
from superset import db
from superset.models.dashboard import Dashboard
from superset.models.schedules import (
    DashboardEmailSchedule,
    EmailDeliveryType,
    SliceEmailReportFormat,
    SliceEmailSchedule,
)
from superset.tasks.schedules import (
    create_webdriver,
    deliver_dashboard,
    deliver_slice,
    next_schedules,
)
from superset.models.slice import Slice
from tests.base_tests import SupersetTestCase
from tests.utils import read_fixture


class TestSchedules(SupersetTestCase):

    RECIPIENTS = "recipient1@superset.com, recipient2@superset.com"
    BCC = "bcc@superset.com"
    CSV = read_fixture("trends.csv")

    @classmethod
    def setUpClass(cls):
        with app.app_context():
            cls.common_data = dict(
                active=True,
                crontab="* * * * *",
                recipients=cls.RECIPIENTS,
                deliver_as_group=True,
                delivery_type=EmailDeliveryType.inline,
            )

            # Pick up a sample slice and dashboard
            slce = db.session.query(Slice).filter_by(slice_name="Participants").one()
            dashboard = (
                db.session.query(Dashboard)
                .filter_by(dashboard_title="World Bank's Data")
                .one()
            )

            dashboard_schedule = DashboardEmailSchedule(**cls.common_data)
            dashboard_schedule.dashboard_id = dashboard.id
            dashboard_schedule.user_id = 1
            db.session.add(dashboard_schedule)

            slice_schedule = SliceEmailSchedule(**cls.common_data)
            slice_schedule.slice_id = slce.id
            slice_schedule.user_id = 1
            slice_schedule.email_format = SliceEmailReportFormat.data
            slice_schedule.slack_channel = "#test_channel"

            db.session.add(slice_schedule)
            db.session.commit()

            cls.slice_schedule = slice_schedule.id
            cls.dashboard_schedule = dashboard_schedule.id

    @classmethod
    def tearDownClass(cls):
        with app.app_context():
            db.session.query(SliceEmailSchedule).filter_by(
                id=cls.slice_schedule
            ).delete()
            db.session.query(DashboardEmailSchedule).filter_by(
                id=cls.dashboard_schedule
            ).delete()
            db.session.commit()

    def test_crontab_scheduler(self):
        crontab = "* * * * *"

        start_at = datetime.now().replace(microsecond=0, second=0, minute=0)
        stop_at = start_at + timedelta(seconds=3600)

        # Fire off the task every minute
        schedules = list(next_schedules(crontab, start_at, stop_at, resolution=0))

        self.assertEqual(schedules[0], start_at)
        self.assertEqual(schedules[-1], stop_at - timedelta(seconds=60))
        self.assertEqual(len(schedules), 60)

        # Fire off the task every 10 minutes, controlled via resolution
        schedules = list(next_schedules(crontab, start_at, stop_at, resolution=10 * 60))

        self.assertEqual(schedules[0], start_at)
        self.assertEqual(schedules[-1], stop_at - timedelta(seconds=10 * 60))
        self.assertEqual(len(schedules), 6)

        # Fire off the task every 12 minutes, controlled via resolution
        schedules = list(next_schedules(crontab, start_at, stop_at, resolution=12 * 60))

        self.assertEqual(schedules[0], start_at)
        self.assertEqual(schedules[-1], stop_at - timedelta(seconds=12 * 60))
        self.assertEqual(len(schedules), 5)

    def test_wider_schedules(self):
        crontab = "*/15 2,10 * * *"

        for hour in range(0, 24):
            start_at = datetime.now().replace(
                microsecond=0, second=0, minute=0, hour=hour
            )
            stop_at = start_at + timedelta(seconds=3600)
            schedules = list(next_schedules(crontab, start_at, stop_at, resolution=0))

            if hour in (2, 10):
                self.assertEqual(len(schedules), 4)
            else:
                self.assertEqual(len(schedules), 0)

    def test_complex_schedule(self):
        # Run the job on every Friday of March and May
        # On these days, run the job at
        # 5:10 pm
        # 5:11 pm
        # 5:12 pm
        # 5:13 pm
        # 5:14 pm
        # 5:15 pm
        # 5:25 pm
        # 5:28 pm
        # 5:31 pm
        # 5:34 pm
        # 5:37 pm
        # 5:40 pm
        crontab = "10-15,25-40/3 17 * 3,5 5"
        start_at = datetime.strptime("2018/01/01", "%Y/%m/%d")
        stop_at = datetime.strptime("2018/12/31", "%Y/%m/%d")

        schedules = list(next_schedules(crontab, start_at, stop_at, resolution=60))
        self.assertEqual(len(schedules), 108)
        fmt = "%Y-%m-%d %H:%M:%S"
        self.assertEqual(schedules[0], datetime.strptime("2018-03-02 17:10:00", fmt))
        self.assertEqual(schedules[-1], datetime.strptime("2018-05-25 17:40:00", fmt))
        self.assertEqual(schedules[59], datetime.strptime("2018-03-30 17:40:00", fmt))
        self.assertEqual(schedules[60], datetime.strptime("2018-05-04 17:10:00", fmt))

    @patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
    def test_create_driver(self, mock_driver_class):
        mock_driver = Mock()
        mock_driver_class.return_value = mock_driver
        mock_driver.find_elements_by_id.side_effect = [True, False]

        create_webdriver(db.session)
        mock_driver.add_cookie.assert_called_once()

    @patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
    @patch("superset.tasks.schedules.send_email_smtp")
    @patch("superset.tasks.schedules.time")
    def test_deliver_dashboard_inline(self, mtime, send_email_smtp, driver_class):
        element = Mock()
        driver = Mock()
        mtime.sleep.return_value = None

        driver_class.return_value = driver

        # Ensure that we are able to login with the driver
        driver.find_elements_by_id.side_effect = [True, False]
        driver.find_element_by_class_name.return_value = element
        element.screenshot_as_png = read_fixture("sample.png")

        schedule = (
            db.session.query(DashboardEmailSchedule)
            .filter_by(id=self.dashboard_schedule)
            .one()
        )

        deliver_dashboard(
            schedule.dashboard_id,
            schedule.recipients,
            schedule.slack_channel,
            schedule.delivery_type,
            schedule.deliver_as_group,
        )

        mtime.sleep.assert_called_once()
        driver.screenshot.assert_not_called()
        send_email_smtp.assert_called_once()

    @patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
    @patch("superset.tasks.schedules.send_email_smtp")
    @patch("superset.tasks.schedules.time")
    def test_deliver_dashboard_as_attachment(
        self, mtime, send_email_smtp, driver_class
    ):
        element = Mock()
        driver = Mock()
        mtime.sleep.return_value = None

        driver_class.return_value = driver

        # Ensure that we are able to login with the driver
        driver.find_elements_by_id.side_effect = [True, False]
        driver.find_element_by_id.return_value = element
        driver.find_element_by_class_name.return_value = element
        element.screenshot_as_png = read_fixture("sample.png")

        schedule = (
            db.session.query(DashboardEmailSchedule)
            .filter_by(id=self.dashboard_schedule)
            .one()
        )

        schedule.delivery_type = EmailDeliveryType.attachment

        deliver_dashboard(
            schedule.dashboard_id,
            schedule.recipients,
            schedule.slack_channel,
            schedule.delivery_type,
            schedule.deliver_as_group,
        )

        mtime.sleep.assert_called_once()
        driver.screenshot.assert_not_called()
        send_email_smtp.assert_called_once()
        self.assertIsNone(send_email_smtp.call_args[1]["images"])
        self.assertEqual(
            send_email_smtp.call_args[1]["data"]["screenshot"],
            element.screenshot_as_png,
        )

    @patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
    @patch("superset.tasks.schedules.send_email_smtp")
    @patch("superset.tasks.schedules.time")
    def test_dashboard_chrome_like(self, mtime, send_email_smtp, driver_class):
        # Test functionality for chrome driver which does not support
        # element snapshots
        element = Mock()
        driver = Mock()
        mtime.sleep.return_value = None
        type(element).screenshot_as_png = PropertyMock(side_effect=WebDriverException)

        driver_class.return_value = driver

        # Ensure that we are able to login with the driver
        driver.find_elements_by_id.side_effect = [True, False]
        driver.find_element_by_id.return_value = element
        driver.find_element_by_class_name.return_value = element
        driver.screenshot.return_value = read_fixture("sample.png")

        schedule = (
            db.session.query(DashboardEmailSchedule)
            .filter_by(id=self.dashboard_schedule)
            .one()
        )

        deliver_dashboard(
            schedule.dashboard_id,
            schedule.recipients,
            schedule.slack_channel,
            schedule.delivery_type,
            schedule.deliver_as_group,
        )

        mtime.sleep.assert_called_once()
        driver.screenshot.assert_called_once()
        send_email_smtp.assert_called_once()

        self.assertEqual(send_email_smtp.call_args[0][0], self.RECIPIENTS)
        self.assertEqual(
            list(send_email_smtp.call_args[1]["images"].values())[0],
            driver.screenshot.return_value,
        )

    @patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
    @patch("superset.tasks.schedules.send_email_smtp")
    @patch("superset.tasks.schedules.time")
    def test_deliver_email_options(self, mtime, send_email_smtp, driver_class):
        element = Mock()
        driver = Mock()
        mtime.sleep.return_value = None

        driver_class.return_value = driver

        # Ensure that we are able to login with the driver
        driver.find_elements_by_id.side_effect = [True, False]
        driver.find_element_by_class_name.return_value = element
        element.screenshot_as_png = read_fixture("sample.png")

        schedule = (
            db.session.query(DashboardEmailSchedule)
            .filter_by(id=self.dashboard_schedule)
            .one()
        )

        # Send individual mails to the group
        schedule.deliver_as_group = False

        # Set a bcc email address
        app.config["EMAIL_REPORT_BCC_ADDRESS"] = self.BCC

        deliver_dashboard(
            schedule.dashboard_id,
            schedule.recipients,
            schedule.slack_channel,
            schedule.delivery_type,
            schedule.deliver_as_group,
        )

        mtime.sleep.assert_called_once()
        driver.screenshot.assert_not_called()

        self.assertEqual(send_email_smtp.call_count, 2)
        self.assertEqual(send_email_smtp.call_args[1]["bcc"], self.BCC)

    @patch("superset.tasks.slack_util.WebClient.files_upload")
    @patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
    @patch("superset.tasks.schedules.send_email_smtp")
    @patch("superset.tasks.schedules.time")
    def test_deliver_slice_inline_image(
        self, mtime, send_email_smtp, driver_class, files_upload
    ):
        element = Mock()
        driver = Mock()
        mtime.sleep.return_value = None

        driver_class.return_value = driver

        # Ensure that we are able to login with the driver
        driver.find_elements_by_id.side_effect = [True, False]
        driver.find_element_by_class_name.return_value = element
        element.screenshot_as_png = read_fixture("sample.png")

        schedule = (
            db.session.query(SliceEmailSchedule).filter_by(id=self.slice_schedule).one()
        )

        schedule.email_format = SliceEmailReportFormat.visualization
        schedule.delivery_format = EmailDeliveryType.inline

        deliver_slice(
            schedule.slice_id,
            schedule.recipients,
            schedule.slack_channel,
            schedule.delivery_type,
            schedule.email_format,
            schedule.deliver_as_group,
            db.session,
        )
        mtime.sleep.assert_called_once()
        driver.screenshot.assert_not_called()
        send_email_smtp.assert_called_once()

        self.assertEqual(
            list(send_email_smtp.call_args[1]["images"].values())[0],
            element.screenshot_as_png,
        )

        self.assertEqual(
            files_upload.call_args[1],
            {
                "channels": "#test_channel",
                "file": element.screenshot_as_png,
                "initial_comment": f"\n        *Participants*\n\n        <http://0.0.0.0:8080/superset/slice/{schedule.slice_id}/|Explore in Superset>\n        ",
                "title": "[Report]  Participants",
            },
        )

    @patch("superset.tasks.slack_util.WebClient.files_upload")
    @patch("superset.tasks.schedules.firefox.webdriver.WebDriver")
    @patch("superset.tasks.schedules.send_email_smtp")
    @patch("superset.tasks.schedules.time")
    def test_deliver_slice_attachment(
        self, mtime, send_email_smtp, driver_class, files_upload
    ):
        element = Mock()
        driver = Mock()
        mtime.sleep.return_value = None

        driver_class.return_value = driver

        # Ensure that we are able to login with the driver
        driver.find_elements_by_id.side_effect = [True, False]
        driver.find_element_by_class_name.return_value = element
        element.screenshot_as_png = read_fixture("sample.png")

        schedule = (
            db.session.query(SliceEmailSchedule).filter_by(id=self.slice_schedule).one()
        )

        schedule.email_format = SliceEmailReportFormat.visualization
        schedule.delivery_type = EmailDeliveryType.attachment

        deliver_slice(
            schedule.slice_id,
            schedule.recipients,
            schedule.slack_channel,
            schedule.delivery_type,
            schedule.email_format,
            schedule.deliver_as_group,
            db.session,
        )

        mtime.sleep.assert_called_once()
        driver.screenshot.assert_not_called()
        send_email_smtp.assert_called_once()

        self.assertEqual(
            send_email_smtp.call_args[1]["data"]["screenshot"],
            element.screenshot_as_png,
        )

        self.assertEqual(
            files_upload.call_args[1],
            {
                "channels": "#test_channel",
                "file": element.screenshot_as_png,
                "initial_comment": f"\n        *Participants*\n\n        <http://0.0.0.0:8080/superset/slice/{schedule.slice_id}/|Explore in Superset>\n        ",
                "title": "[Report]  Participants",
            },
        )

    @patch("superset.tasks.slack_util.WebClient.files_upload")
    @patch("superset.tasks.schedules.urllib.request.OpenerDirector.open")
    @patch("superset.tasks.schedules.urllib.request.urlopen")
    @patch("superset.tasks.schedules.send_email_smtp")
    def test_deliver_slice_csv_attachment(
        self, send_email_smtp, mock_open, mock_urlopen, files_upload
    ):
        response = Mock()
        mock_open.return_value = response
        mock_urlopen.return_value = response
        mock_urlopen.return_value.getcode.return_value = 200
        response.read.return_value = self.CSV

        schedule = (
            db.session.query(SliceEmailSchedule).filter_by(id=self.slice_schedule).one()
        )

        schedule.email_format = SliceEmailReportFormat.data
        schedule.delivery_type = EmailDeliveryType.attachment

        deliver_slice(
            schedule.slice_id,
            schedule.recipients,
            schedule.slack_channel,
            schedule.delivery_type,
            schedule.email_format,
            schedule.deliver_as_group,
            db.session,
        )

        send_email_smtp.assert_called_once()

        file_name = __("%(name)s.csv", name=schedule.slice.slice_name)

        self.assertEqual(send_email_smtp.call_args[1]["data"][file_name], self.CSV)

        self.assertEqual(
            files_upload.call_args[1],
            {
                "channels": "#test_channel",
                "file": self.CSV,
                "initial_comment": f"\n        *Participants*\n\n        <http://0.0.0.0:8080/superset/slice/{schedule.slice_id}/|Explore in Superset>\n        ",
                "title": "[Report]  Participants",
            },
        )

    @patch("superset.tasks.slack_util.WebClient.files_upload")
    @patch("superset.tasks.schedules.urllib.request.urlopen")
    @patch("superset.tasks.schedules.urllib.request.OpenerDirector.open")
    @patch("superset.tasks.schedules.send_email_smtp")
    def test_deliver_slice_csv_inline(
        self, send_email_smtp, mock_open, mock_urlopen, files_upload
    ):
        response = Mock()
        mock_open.return_value = response
        mock_urlopen.return_value = response
        mock_urlopen.return_value.getcode.return_value = 200
        response.read.return_value = self.CSV
        schedule = (
            db.session.query(SliceEmailSchedule).filter_by(id=self.slice_schedule).one()
        )

        schedule.email_format = SliceEmailReportFormat.data
        schedule.delivery_type = EmailDeliveryType.inline

        deliver_slice(
            schedule.slice_id,
            schedule.recipients,
            schedule.slack_channel,
            schedule.delivery_type,
            schedule.email_format,
            schedule.deliver_as_group,
            db.session,
        )

        send_email_smtp.assert_called_once()

        self.assertIsNone(send_email_smtp.call_args[1]["data"])
        self.assertTrue("<table " in send_email_smtp.call_args[0][2])

        self.assertEqual(
            files_upload.call_args[1],
            {
                "channels": "#test_channel",
                "file": self.CSV,
                "initial_comment": f"\n        *Participants*\n\n        <http://0.0.0.0:8080/superset/slice/{schedule.slice_id}/|Explore in Superset>\n        ",
                "title": "[Report]  Participants",
            },
        )


def test_slack_client_compatibility():
    c2 = WebClient()
    # slackclient >2.5.0 raises TypeError: a bytes-like object is required, not 'str
    # and requires to path a filepath instead of the bytes directly
    with pytest.raises(errors.SlackApiError):
        c2.files_upload(channels="#bogdan-test2", file=b"blabla", title="Test upload")
