# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
''' mock_proto.py '''
from heronpy.api import api_constants
import heron.proto.execution_state_pb2 as protoEState
import heron.proto.physical_plan_pb2 as protoPPlan
import heron.proto.packing_plan_pb2 as protoPackingPlan
import heron.proto.tmaster_pb2 as protoTmaster
import heron.proto.topology_pb2 as protoTopology

# pylint: disable=no-self-use, missing-docstring
class MockProto(object):
  ''' Mocking Proto'''
  topology_name = "mock_topology_name"
  topology_id = "mock_topology_id"
  cluster = "mock_topology_cluster"
  environ = "mock_topology_environ"

  def create_mock_spout(self,
                        spout_name,
                        output_streams,
                        spout_parallelism):
    spout = protoTopology.Spout()
    spout.comp.name = spout_name
    kv = spout.comp.config.kvs.add()
    kv.key = api_constants.TOPOLOGY_COMPONENT_PARALLELISM
    kv.type = protoTopology.ConfigValueType.Value('STRING_VALUE')
    kv.value = str(spout_parallelism)
    for stream in output_streams:
      spout.outputs.add().stream.CopyFrom(stream)
    return spout

  def create_mock_resource(self):
    resource = protoPackingPlan.Resource()
    resource.cpu = 1.0
    resource.ram = 1024
    resource.disk = 1024 * 2
    return resource

  def create_mock_instance_plan(self):
    instancePlan = protoPackingPlan.InstancePlan()
    instancePlan.component_name  = "word"
    instancePlan.task_id = 1
    instancePlan.component_index = 1
    instancePlan.resource.CopyFrom(self.create_mock_resource())
    return instancePlan

  def create_mock_simple_container_plan(self):
    containerPlan = protoPackingPlan.ContainerPlan()
    containerPlan.id = 1
    containerPlan.instance_plans.extend([self.create_mock_instance_plan()])
    containerPlan.requiredResource.CopyFrom(self.create_mock_resource())

    return containerPlan

  def create_mock_simple_container_plan2(self):
    containerPlan = protoPackingPlan.ContainerPlan()
    containerPlan.id = 1
    containerPlan.instance_plans.extend([self.create_mock_instance_plan()])
    containerPlan.requiredResource.CopyFrom(self.create_mock_resource())
    containerPlan.scheduledResource.CopyFrom(self.create_mock_resource())
    return containerPlan

  def create_mock_bolt(self,
                       bolt_name,
                       input_streams,
                       output_streams,
                       bolt_parallelism):
    bolt = protoTopology.Bolt()
    bolt.comp.name = bolt_name
    kv = bolt.comp.config.kvs.add()
    kv.key = api_constants.TOPOLOGY_COMPONENT_PARALLELISM
    kv.type = protoTopology.ConfigValueType.Value('STRING_VALUE')
    kv.value = str(bolt_parallelism)
    for stream in input_streams:
      bolt.inputs.add().stream.CopyFrom(stream)
    for stream in output_streams:
      bolt.outputs.add().stream.CopyFrom(stream)
    return bolt

  def create_mock_simple_topology(
      self,
      spout_parallelism=1,
      bolt_parallelism=1):
    """
    Simple topology contains one spout and one bolt.
    """
    topology = protoTopology.Topology()
    topology.id = MockProto.topology_id
    topology.name = MockProto.topology_name

    # Stream1
    stream1 = protoTopology.StreamId()
    stream1.id = "mock_stream1"
    stream1.component_name = "mock_spout"

    # Spout1
    spout = self.create_mock_spout("mock_spout", [stream1], spout_parallelism)
    topology.spouts.extend([spout])

    # Bolt1
    bolt = self.create_mock_bolt("mock_bolt", [stream1], [], bolt_parallelism)
    topology.bolts.extend([bolt])

    return topology

  def create_mock_medium_topology(
      self,
      spout_parallelism=1,
      bolt1_parallelism=1,
      bolt2_parallelism=1,
      bolt3_parallelism=1):
    """
    Medium topology is a three stage topology
    with one spout, two mid stage bolts, and one
    last stage bolt.
    S -str1-> B1 -str3-> B3
    S -str2-> B2 -str4-> B3
    """
    topology = protoTopology.Topology()
    topology.id = "mock_topology_id"
    topology.name = "mock_topology_name"

    # Streams
    stream1 = protoTopology.StreamId()
    stream1.id = "mock_stream1"
    stream1.component_name = "mock_spout1"

    stream2 = protoTopology.StreamId()
    stream2.id = "mock_stream2"
    stream2.component_name = "mock_spout1"

    stream3 = protoTopology.StreamId()
    stream3.id = "mock_stream3"
    stream3.component_name = "mock_bolt1"

    stream4 = protoTopology.StreamId()
    stream4.id = "mock_stream4"
    stream4.component_name = "mock_bolt2"

    # Spouts
    spout1 = self.create_mock_spout("mock_spout1",
                                    [stream1, stream2],
                                    spout_parallelism)
    topology.spouts.extend([spout1])

    # Bolts
    bolt1 = self.create_mock_bolt("mock_bolt1",
                                  [stream1],
                                  [stream3],
                                  bolt1_parallelism)
    bolt2 = self.create_mock_bolt("mock_bolt2",
                                  [stream2],
                                  [stream4],
                                  bolt2_parallelism)
    bolt3 = self.create_mock_bolt("mock_bolt3",
                                  [stream3, stream4],
                                  [],
                                  bolt3_parallelism)
    topology.bolts.extend([bolt1, bolt2, bolt3])


    return topology

  def create_mock_simple_physical_plan(
      self,
      spout_parallelism=1,
      bolt_parallelism=1):
    pplan = protoPPlan.PhysicalPlan()
    pplan.topology.CopyFrom(self.create_mock_simple_topology(
        spout_parallelism,
        bolt_parallelism))
    return pplan

  def create_mock_simple_packing_plan(
    self):
    packingPlan = protoPackingPlan.PackingPlan()
    packingPlan.id = "ExclamationTopology"
    packingPlan.container_plans.extend([self.create_mock_simple_container_plan()])
    return packingPlan

  def create_mock_simple_packing_plan2(
    self):
    packingPlan = protoPackingPlan.PackingPlan()
    packingPlan.id = "ExclamationTopology"
    packingPlan.container_plans.extend([self.create_mock_simple_container_plan2()])
    packingPlan.container_plans.extend([self.create_mock_simple_container_plan()])
    return packingPlan

  def create_mock_medium_physical_plan(
      self,
      spout_parallelism=1,
      bolt1_parallelism=1,
      bolt2_parallelism=1,
      bolt3_parallelism=1):
    pplan = protoPPlan.PhysicalPlan()
    pplan.topology.CopyFrom(self.create_mock_medium_topology(
        spout_parallelism,
        bolt1_parallelism,
        bolt2_parallelism,
        bolt3_parallelism))
    return pplan

  def create_mock_execution_state(self):
    estate = protoEState.ExecutionState()
    estate.topology_name = MockProto.topology_name
    estate.topology_id = MockProto.topology_id
    estate.cluster = MockProto.cluster
    estate.environ = MockProto.environ
    return estate

  def create_mock_tmaster(self):
    tmaster = protoTmaster.TMasterLocation()
    return tmaster

  def add_topology_config(self, topology, key, value):
    kv = topology.topology_config.kvs.add()
    kv.key = key
    kv.type = protoTopology.ConfigValueType.Value('STRING_VALUE')
    kv.value = str(value)
