################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.common.execution_config import ExecutionConfig
from pyflink.common.job_execution_result import JobExecutionResult
from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
from pyflink.java_gateway import get_gateway
from pyflink.util.utils import load_java_class


class ExecutionEnvironment(object):
    """
    The ExecutionEnvironment is the context in which a program is executed.

    The environment provides methods to control the job execution (such as setting the parallelism)
    and to interact with the outside world (data access).
    """

    def __init__(self, j_execution_environment):
        self._j_execution_environment = j_execution_environment

    def get_parallelism(self) -> int:
        """
        Gets the parallelism with which operation are executed by default.

        :return: The parallelism.
        """
        return self._j_execution_environment.getParallelism()

    def set_parallelism(self, parallelism: int):
        """
        Sets the parallelism for operations executed through this environment.
        Setting a parallelism of x here will cause all operators to run with
        x parallel instances.

        :param parallelism: The parallelism.
        """
        self._j_execution_environment.setParallelism(parallelism)

    def get_default_local_parallelism(self) -> int:
        """
        Gets the default parallelism that will be used for the local execution environment.

        :return: The parallelism.
        """
        return self._j_execution_environment.getDefaultLocalParallelism()

    def set_default_local_parallelism(self, parallelism: int):
        """
        Sets the default parallelism that will be used for the local execution environment.

        :param parallelism: The parallelism.
        """
        self._j_execution_environment.setDefaultLocalParallelism(parallelism)

    def get_config(self) -> ExecutionConfig:
        """
        Gets the config object that defines execution parameters.

        :return: An :class:`ExecutionConfig` object, the environment's execution configuration.
        """
        return ExecutionConfig(self._j_execution_environment.getConfig())

    def set_restart_strategy(self, restart_strategy_configuration: RestartStrategyConfiguration):
        """
        Sets the restart strategy configuration. The configuration specifies which restart strategy
        will be used for the execution graph in case of a restart.

        Example:
        ::

            >>> env.set_restart_strategy(RestartStrategies.no_restart())

        :param restart_strategy_configuration: Restart strategy configuration to be set.
        """
        self._j_execution_environment.setRestartStrategy(
            restart_strategy_configuration._j_restart_strategy_configuration)

    def get_restart_strategy(self) -> RestartStrategyConfiguration:
        """
        Returns the specified restart strategy configuration.

        :return: The restart strategy configuration to be used.
        """
        return RestartStrategies._from_j_restart_strategy(
            self._j_execution_environment.getRestartStrategy())

    def add_default_kryo_serializer(self, type_class_name: str, serializer_class_name: str):
        """
        Adds a new Kryo default serializer to the Runtime.

        Example:
        ::

            >>> env.add_default_kryo_serializer("com.aaa.bbb.TypeClass", "com.aaa.bbb.Serializer")

        :param type_class_name: The full-qualified java class name of the types serialized with the
                                given serializer.
        :param serializer_class_name: The full-qualified java class name of the serializer to use.
        """
        type_clz = load_java_class(type_class_name)
        j_serializer_clz = load_java_class(serializer_class_name)
        self._j_execution_environment.addDefaultKryoSerializer(type_clz, j_serializer_clz)

    def register_type_with_kryo_serializer(self, type_class_name: str, serializer_class_name: str):
        """
        Registers the given Serializer via its class as a serializer for the given type at the
        KryoSerializer.

        Example:
        ::

            >>> env.register_type_with_kryo_serializer("com.aaa.bbb.TypeClass",
            ...                                        "com.aaa.bbb.Serializer")

        :param type_class_name: The full-qualified java class name of the types serialized with
                                the given serializer.
        :param serializer_class_name: The full-qualified java class name of the serializer to use.
        """
        type_clz = load_java_class(type_class_name)
        j_serializer_clz = load_java_class(serializer_class_name)
        self._j_execution_environment.registerTypeWithKryoSerializer(type_clz, j_serializer_clz)

    def register_type(self, type_class_name: str):
        """
        Registers the given type with the serialization stack. If the type is eventually
        serialized as a POJO, then the type is registered with the POJO serializer. If the
        type ends up being serialized with Kryo, then it will be registered at Kryo to make
        sure that only tags are written.

        Example:
        ::

            >>> env.register_type("com.aaa.bbb.TypeClass")

        :param type_class_name: The full-qualified java class name of the type to register.
        """
        type_clz = load_java_class(type_class_name)
        self._j_execution_environment.registerType(type_clz)

    def execute(self, job_name: str = None) -> JobExecutionResult:
        """
        Triggers the program execution. The environment will execute all parts of the program that
        have resulted in a "sink" operation.

        The program execution will be logged and displayed with the given job name.

        :param job_name: Desired name of the job, optional.
        :return: The result of the job execution, containing elapsed time and accumulators.
        """
        if job_name is None:
            return JobExecutionResult(self._j_execution_environment.execute())
        else:
            return JobExecutionResult(self._j_execution_environment.execute(job_name))

    def get_execution_plan(self) -> str:
        """
        Creates the plan with which the system will execute the program, and returns it as
        a String using a JSON representation of the execution data flow graph.
        Note that this needs to be called, before the plan is executed.

        If the compiler could not be instantiated, or the master could not
        be contacted to retrieve information relevant to the execution planning,
        an exception will be thrown.

        :return: The execution plan of the program, as a JSON String.
        """
        return self._j_execution_environment.getExecutionPlan()

    @staticmethod
    def get_execution_environment() -> 'ExecutionEnvironment':
        """
        Creates an execution environment that represents the context in which the program is
        currently executed. If the program is invoked standalone, this method returns a local
        execution environment. If the program is invoked from within the command line client to be
        submitted to a cluster, this method returns the execution environment of this cluster.

        :return: The :class:`ExecutionEnvironment` of the context in which the program is executed.
        """
        gateway = get_gateway()
        j_execution_environment = gateway.jvm.org.apache.flink.api.java.ExecutionEnvironment\
            .getExecutionEnvironment()
        return ExecutionEnvironment(j_execution_environment)
