#  Copyright (C) 2019-2020 Bloomberg Finance LP
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU Lesser General Public
#  License as published by the Free Software Foundation; either
#  version 2 of the License, or (at your option) any later version.
#
#  This library is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
#  Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public
#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
#  Authors:
#        Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
import os
import re
from typing import List, Dict, Tuple, Iterable, Optional
import grpc

from . import utils
from .node import MappingNode
from ._cas import CASRemote, CASCache
from ._exceptions import AssetCacheError, RemoteError
from ._remotespec import RemoteSpec, RemoteType
from ._remote import BaseRemote
from ._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc
from ._protos.google.rpc import code_pb2


class AssetRemote(BaseRemote):
    def __init__(self, spec):
        super().__init__(spec)
        self.fetch_service = None
        self.push_service = None

    def close(self):
        self.fetch_service = None
        self.push_service = None
        super().close()

    def _configure_protocols(self):
        # set up remote asset stubs
        self.fetch_service = remote_asset_pb2_grpc.FetchStub(self.channel)
        self.push_service = remote_asset_pb2_grpc.PushStub(self.channel)

    # _check():
    #
    # Check if this remote provides everything required for the
    # particular kind of remote. This is expected to be called as part
    # of check()
    #
    # Raises:
    #     RemoteError: If the upstream has a problem
    #
    def _check(self):
        request = remote_asset_pb2.FetchBlobRequest()
        if self.spec.instance_name:
            request.instance_name = self.spec.instance_name

        try:
            self.fetch_service.FetchBlob(request)
        except grpc.RpcError as e:
            if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
                # Expected error as the request doesn't specify any URIs.
                pass
            elif e.code() == grpc.StatusCode.UNIMPLEMENTED:
                raise RemoteError(
                    "Configured remote does not implement the Remote Asset "
                    "Fetch service. Please check remote configuration."
                )
            else:
                raise RemoteError("Remote initialisation failed with status {}: {}".format(e.code().name, e.details()))

        if self.spec.push:
            request = remote_asset_pb2.PushBlobRequest()
            if self.spec.instance_name:
                request.instance_name = self.spec.instance_name

            try:
                self.push_service.PushBlob(request)
            except grpc.RpcError as e:
                if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
                    # Expected error as the request doesn't specify any URIs.
                    pass
                elif e.code() == grpc.StatusCode.UNIMPLEMENTED:
                    raise RemoteError(
                        "Configured remote does not implement the Remote Asset "
                        "Push service. Please check remote configuration."
                    )
                else:
                    raise RemoteError(
                        "Remote initialisation failed with status {}: {}".format(e.code().name, e.details())
                    )

    # fetch_blob():
    #
    # Resolve URIs to a CAS blob digest.
    #
    # Args:
    #    uris (list of str): The URIs to resolve. Multiple URIs should represent
    #                        the same content available at different locations.
    #    qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
    #                                    content to fetch.
    #
    # Returns
    #    (FetchBlobResponse): The asset server response or None if the resource
    #                         is not available.
    #
    # Raises:
    #     AssetCacheError: If the upstream has a problem
    #
    def fetch_blob(self, uris, *, qualifiers=None):
        request = remote_asset_pb2.FetchBlobRequest()
        if self.spec.instance_name:
            request.instance_name = self.spec.instance_name
        request.uris.extend(uris)
        if qualifiers:
            request.qualifiers.extend(qualifiers)

        try:
            response = self.fetch_service.FetchBlob(request)
        except grpc.RpcError as e:
            if e.code() == grpc.StatusCode.NOT_FOUND:
                return None

            raise AssetCacheError("FetchBlob failed with status {}: {}".format(e.code().name, e.details())) from e

        if response.status.code == code_pb2.NOT_FOUND:
            return None

        if response.status.code != code_pb2.OK:
            raise AssetCacheError("FetchBlob failed with response status {}".format(response.status.code))

        return response

    # fetch_directory():
    #
    # Resolve URIs to a CAS Directory digest.
    #
    # Args:
    #    uris (list of str): The URIs to resolve. Multiple URIs should represent
    #                        the same content available at different locations.
    #    qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
    #                                    content to fetch.
    #
    # Returns
    #    (FetchDirectoryResponse): The asset server response or None if the resource
    #                              is not available.
    #
    # Raises:
    #     AssetCacheError: If the upstream has a problem
    #
    def fetch_directory(self, uris, *, qualifiers=None):
        request = remote_asset_pb2.FetchDirectoryRequest()
        if self.spec.instance_name:
            request.instance_name = self.spec.instance_name
        request.uris.extend(uris)
        if qualifiers:
            request.qualifiers.extend(qualifiers)

        try:
            response = self.fetch_service.FetchDirectory(request)
        except grpc.RpcError as e:
            if e.code() == grpc.StatusCode.NOT_FOUND:
                return None

            raise AssetCacheError("FetchDirectory failed with status {}: {}".format(e.code().name, e.details())) from e

        if response.status.code == code_pb2.NOT_FOUND:
            return None

        if response.status.code != code_pb2.OK:
            raise AssetCacheError("FetchDirectory failed with response status {}".format(response.status.code))

        return response

    # push_blob():
    #
    # Associate a CAS blob digest to URIs.
    #
    # Args:
    #    uris (list of str): The URIs to associate with the blob digest.
    #    blob_digest (Digest): The CAS blob to associate.
    #    qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
    #                                    content that is being pushed.
    #    references_blobs (list of Digest): Referenced blobs that need to not expire
    #                                       before expiration of this association.
    #    references_directories (list of Digest): Referenced directories that need to not expire
    #                                             before expiration of this association.
    #
    # Raises:
    #     AssetCacheError: If the upstream has a problem
    #
    def push_blob(self, uris, blob_digest, *, qualifiers=None, references_blobs=None, references_directories=None):
        request = remote_asset_pb2.PushBlobRequest()
        if self.spec.instance_name:
            request.instance_name = self.spec.instance_name
        request.uris.extend(uris)
        request.blob_digest.CopyFrom(blob_digest)
        if qualifiers:
            request.qualifiers.extend(qualifiers)
        if references_blobs:
            request.references_blobs.extend(references_blobs)
        if references_directories:
            request.references_directories.extend(references_directories)

        try:
            self.push_service.PushBlob(request)
        except grpc.RpcError as e:
            raise AssetCacheError("PushBlob failed with status {}: {}".format(e.code().name, e.details())) from e

    # push_directory():
    #
    # Associate a CAS Directory digest to URIs.
    #
    # Args:
    #    uris (list of str): The URIs to associate with the blob digest.
    #    directory_digest (Digest): The CAS Direcdtory to associate.
    #    qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
    #                                    content that is being pushed.
    #    references_blobs (list of Digest): Referenced blobs that need to not expire
    #                                       before expiration of this association.
    #    references_directories (list of Digest): Referenced directories that need to not expire
    #                                             before expiration of this association.
    #
    # Raises:
    #     AssetCacheError: If the upstream has a problem
    #
    def push_directory(
        self, uris, directory_digest, *, qualifiers=None, references_blobs=None, references_directories=None
    ):
        request = remote_asset_pb2.PushDirectoryRequest()
        if self.spec.instance_name:
            request.instance_name = self.spec.instance_name
        request.uris.extend(uris)
        request.root_directory_digest.CopyFrom(directory_digest)
        if qualifiers:
            request.qualifiers.extend(qualifiers)
        if references_blobs:
            request.references_blobs.extend(references_blobs)
        if references_directories:
            request.references_directories.extend(references_directories)

        try:
            self.push_service.PushDirectory(request)
        except grpc.RpcError as e:
            raise AssetCacheError("PushDirectory failed with status {}: {}".format(e.code().name, e.details())) from e


# RemotePair()
#
# A pair of remotes which corresponds to a RemoteSpec, we
# need separate remote objects for the index and the storage so
# we store them together for each RemoteSpec here.
#
# Either members of the RemotePair may be None, in case that
# the user specified a diffrerent RemoteSpec for indexing and
# for storage.
#
# Both members may also be None, in the case that we were unable
# to establish a connection to this remote at initialization time.
#
class RemotePair:
    def __init__(self, cas: CASCache, spec: RemoteSpec):
        self.index: Optional[AssetRemote] = None
        self.storage: Optional[CASRemote] = None
        self.error: Optional[str] = None

        try:
            if spec.remote_type in [RemoteType.INDEX, RemoteType.ALL]:
                index = AssetRemote(spec)
                index.check()
                self.index = index
            if spec.remote_type in [RemoteType.STORAGE, RemoteType.ALL]:
                storage = CASRemote(spec, cas)
                storage.check()
                self.storage = storage
        except RemoteError as e:
            self.error = str(e)


# Base Asset Cache for Caches to derive from
#
class AssetCache:
    def __init__(self, context):
        self.context = context
        self.cas: CASCache = context.get_cascache()

        # Table of RemotePair objects
        self._remotes: Dict[RemoteSpec, RemotePair] = {}

        # Table of prioritized RemoteSpecs which are valid for each project
        self._project_specs: Dict[str, List[RemoteSpec]] = {}

        self._has_fetch_remotes: bool = False
        self._has_push_remotes: bool = False

        self._basedir = None

    # release_resources():
    #
    # Release resources used by AssetCache.
    #
    def release_resources(self):

        # Close all remotes and their gRPC channels
        for remote in self._remotes.values():
            if remote.index:
                remote.index.close()
            if remote.storage:
                remote.storage.close()

    # setup_remotes():
    #
    # Sets up which remotes to use
    #
    # Args:
    #    specs: The active remote specs
    #    project_specs: List of specs for each project
    #
    def setup_remotes(self, specs: Iterable[RemoteSpec], project_specs: Dict[str, List[RemoteSpec]]):

        # Hold on to the project specs
        self._project_specs = project_specs

        with self.context.messenger.timed_activity("Initializing remote caches", silent_nested=True):
            for spec in specs:
                # This can be called multiple times, ensure that we only try
                # to instantiate each remote once.
                #
                if spec in self._remotes:
                    continue

                remote = RemotePair(self.cas, spec)
                if remote.error:
                    self.context.messenger.warn("Failed to initialize remote {}: {}".format(spec.url, remote.error))

                self._remotes[spec] = remote

        # Determine overall existance of push or fetch remotes
        self._has_fetch_remotes = any(remote.storage for _, remote in self._remotes.items()) and any(
            remote.index for _, remote in self._remotes.items()
        )
        self._has_push_remotes = any(spec.push and remote.storage for spec, remote in self._remotes.items()) and any(
            spec.push and remote.index for spec, remote in self._remotes.items()
        )

    # get_remotes():
    #
    # List the index remotes and storage remotes available for fetching
    #
    # Args:
    #    project_name: The project name
    #    push: Whether pushing is required for this remote
    #
    # Returns:
    #    index_remotes: The index remotes
    #    storage_remotes: The storage remotes
    #
    def get_remotes(self, project_name: str, push: bool) -> Tuple[List[AssetRemote], List[CASRemote]]:
        try:
            project_specs = self._project_specs[project_name]
        except KeyError:
            # Technically this shouldn't happen, but here is a defensive return none the less.
            return [], []

        index_remotes = []
        storage_remotes = []
        for spec in project_specs:

            if push and not spec.push:
                continue

            remote = self._remotes[spec]
            if remote.index:
                index_remotes.append(remote.index)
            if remote.storage:
                storage_remotes.append(remote.storage)

        return index_remotes, storage_remotes

    # has_fetch_remotes():
    #
    # Check whether any remote repositories are available for fetching.
    #
    # Args:
    #     plugin (Plugin): The Plugin to check
    #
    # Returns: True if any remote repositories are configured, False otherwise
    #
    def has_fetch_remotes(self, *, plugin=None):
        if not self._has_fetch_remotes:
            # No project has fetch remotes
            return False
        elif plugin is None:
            # At least one (sub)project has fetch remotes
            return True
        else:
            project = plugin._get_project()
            index_remotes, storage_remotes = self.get_remotes(project.name, False)

            # Check whether the specified element's project has fetch remotes
            return index_remotes and storage_remotes

    # has_push_remotes():
    #
    # Check whether any remote repositories are available for pushing.
    #
    # Args:
    #     element (Element): The Element to check
    #
    # Returns: True if any remote repository is configured, False otherwise
    #
    def has_push_remotes(self, *, plugin=None):
        if not self._has_push_remotes:
            # No project has push remotes
            return False
        elif plugin is None:
            # At least one (sub)project has push remotes
            return True
        else:
            project = plugin._get_project()
            index_remotes, storage_remotes = self.get_remotes(project.name, True)

            # Check whether the specified element's project has fetch remotes
            return bool(index_remotes and storage_remotes)

    # list_refs_mtimes()
    #
    # List refs in a directory, given a base path. Also returns the
    # associated mtimes
    #
    # Args:
    #    base_path (str): Base path to traverse over
    #    glob_expr (str|None): Optional glob expression to match against files
    #
    # Returns:
    #     (iter (mtime, filename)]): iterator of tuples of mtime and refs
    #
    def list_refs_mtimes(self, base_path, *, glob_expr=None):
        path = base_path
        if glob_expr is not None:
            globdir = os.path.dirname(glob_expr)
            if not any(c in "*?[" for c in globdir):
                # path prefix contains no globbing characters so
                # append the glob to optimise the os.walk()
                path = os.path.join(base_path, globdir)

        regexer = None
        if glob_expr:
            expression = utils._glob2re(glob_expr)
            regexer = re.compile(expression)

        for root, _, files in os.walk(path):
            for filename in files:
                ref_path = os.path.join(root, filename)
                relative_path = os.path.relpath(ref_path, base_path)  # Relative to refs head
                if regexer is None or regexer.match(relative_path):
                    # Obtain the mtime (the time a file was last modified)
                    yield (os.path.getmtime(ref_path), relative_path)

    # remove_ref()
    #
    # Removes a ref.
    #
    # This also takes care of pruning away directories which can
    # be removed after having removed the given ref.
    #
    # Args:
    #    ref (str): The ref to remove
    #
    # Raises:
    #    (AssetCacheError): If the ref didnt exist, or a system error
    #                     occurred while removing it
    #
    def remove_ref(self, ref):
        try:
            utils._remove_path_with_parents(self._basedir, ref)
        except FileNotFoundError as e:
            raise AssetCacheError("Could not find ref '{}'".format(ref)) from e
        except OSError as e:
            raise AssetCacheError("System error while removing ref '{}': {}".format(ref, e)) from e
