| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import logging |
| import time |
| from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING |
| |
| from flask import current_app |
| from retry.api import retry_call |
| from selenium.common.exceptions import TimeoutException, WebDriverException |
| from selenium.webdriver import chrome, firefox |
| from selenium.webdriver.common.by import By |
| from selenium.webdriver.remote.webdriver import WebDriver |
| from selenium.webdriver.support import expected_conditions as EC |
| from selenium.webdriver.support.ui import WebDriverWait |
| |
| from superset.extensions import machine_auth_provider_factory |
| |
| WindowSize = Tuple[int, int] |
| logger = logging.getLogger(__name__) |
| |
| # Time in seconds, we will wait for the page to load and render |
| SELENIUM_CHECK_INTERVAL = 2 |
| SELENIUM_RETRIES = 5 |
| SELENIUM_HEADSTART = 3 |
| |
| |
| if TYPE_CHECKING: |
| from flask_appbuilder.security.sqla.models import User |
| |
| |
| class WebDriverProxy: |
| def __init__( |
| self, driver_type: str, window: Optional[WindowSize] = None, |
| ): |
| self._driver_type = driver_type |
| self._window: WindowSize = window or (800, 600) |
| self._screenshot_locate_wait = current_app.config["SCREENSHOT_LOCATE_WAIT"] |
| self._screenshot_load_wait = current_app.config["SCREENSHOT_LOAD_WAIT"] |
| |
| def create(self) -> WebDriver: |
| if self._driver_type == "firefox": |
| driver_class = firefox.webdriver.WebDriver |
| options = firefox.options.Options() |
| elif self._driver_type == "chrome": |
| driver_class = chrome.webdriver.WebDriver |
| options = chrome.options.Options() |
| options.add_argument(f"--window-size={self._window[0]},{self._window[1]}") |
| else: |
| raise Exception(f"Webdriver name ({self._driver_type}) not supported") |
| # Prepare args for the webdriver init |
| |
| # Add additional configured options |
| for arg in current_app.config["WEBDRIVER_OPTION_ARGS"]: |
| options.add_argument(arg) |
| |
| kwargs: Dict[Any, Any] = dict(options=options) |
| kwargs.update(current_app.config["WEBDRIVER_CONFIGURATION"]) |
| logger.info("Init selenium driver") |
| |
| return driver_class(**kwargs) |
| |
| def auth(self, user: "User") -> WebDriver: |
| driver = self.create() |
| return machine_auth_provider_factory.instance.authenticate_webdriver( |
| driver, user |
| ) |
| |
| @staticmethod |
| def destroy(driver: WebDriver, tries: int = 2) -> None: |
| """Destroy a driver""" |
| # This is some very flaky code in selenium. Hence the retries |
| # and catch-all exceptions |
| try: |
| retry_call(driver.close, tries=tries) |
| except Exception: # pylint: disable=broad-except |
| pass |
| try: |
| driver.quit() |
| except Exception: # pylint: disable=broad-except |
| pass |
| |
| def get_screenshot( |
| self, |
| url: str, |
| element_name: str, |
| user: "User", |
| retries: int = SELENIUM_RETRIES, |
| ) -> Optional[bytes]: |
| driver = self.auth(user) |
| driver.set_window_size(*self._window) |
| driver.get(url) |
| img: Optional[bytes] = None |
| logger.debug("Sleeping for %i seconds", SELENIUM_HEADSTART) |
| time.sleep(SELENIUM_HEADSTART) |
| try: |
| logger.debug("Wait for the presence of %s", element_name) |
| element = WebDriverWait(driver, self._screenshot_locate_wait).until( |
| EC.presence_of_element_located((By.CLASS_NAME, element_name)) |
| ) |
| logger.debug("Wait for .loading to be done") |
| WebDriverWait(driver, self._screenshot_load_wait).until_not( |
| EC.presence_of_all_elements_located((By.CLASS_NAME, "loading")) |
| ) |
| logger.info("Taking a PNG screenshot or url %s", url) |
| img = element.screenshot_as_png |
| except TimeoutException: |
| logger.error("Selenium timed out requesting url %s", url, exc_info=True) |
| except WebDriverException as ex: |
| logger.error(ex, exc_info=True) |
| # Some webdrivers do not support screenshots for elements. |
| # In such cases, take a screenshot of the entire page. |
| img = driver.screenshot() # pylint: disable=no-member |
| finally: |
| self.destroy(driver, retries) |
| return img |