blob: b92662bb00944836102864cfebebe378dfd11bfe [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
''' mock_proto.py '''
from heronpy.api import api_constants
import heron.proto.execution_state_pb2 as protoEState
import heron.proto.physical_plan_pb2 as protoPPlan
import heron.proto.packing_plan_pb2 as protoPackingPlan
import heron.proto.tmaster_pb2 as protoTmaster
import heron.proto.topology_pb2 as protoTopology
# pylint: disable=no-self-use, missing-docstring
class MockProto(object):
''' Mocking Proto'''
topology_name = "mock_topology_name"
topology_id = "mock_topology_id"
cluster = "mock_topology_cluster"
environ = "mock_topology_environ"
def create_mock_spout(self,
spout_name,
output_streams,
spout_parallelism):
spout = protoTopology.Spout()
spout.comp.name = spout_name
kv = spout.comp.config.kvs.add()
kv.key = api_constants.TOPOLOGY_COMPONENT_PARALLELISM
kv.type = protoTopology.ConfigValueType.Value('STRING_VALUE')
kv.value = str(spout_parallelism)
for stream in output_streams:
spout.outputs.add().stream.CopyFrom(stream)
return spout
def create_mock_resource(self):
resource = protoPackingPlan.Resource()
resource.cpu = 1.0
resource.ram = 1024
resource.disk = 1024 * 2
return resource
def create_mock_instance_plan(self):
instancePlan = protoPackingPlan.InstancePlan()
instancePlan.component_name = "word"
instancePlan.task_id = 1
instancePlan.component_index = 1
instancePlan.resource.CopyFrom(self.create_mock_resource())
return instancePlan
def create_mock_simple_container_plan(self):
containerPlan = protoPackingPlan.ContainerPlan()
containerPlan.id = 1
containerPlan.instance_plans.extend([self.create_mock_instance_plan()])
containerPlan.requiredResource.CopyFrom(self.create_mock_resource())
return containerPlan
def create_mock_simple_container_plan2(self):
containerPlan = protoPackingPlan.ContainerPlan()
containerPlan.id = 1
containerPlan.instance_plans.extend([self.create_mock_instance_plan()])
containerPlan.requiredResource.CopyFrom(self.create_mock_resource())
containerPlan.scheduledResource.CopyFrom(self.create_mock_resource())
return containerPlan
def create_mock_bolt(self,
bolt_name,
input_streams,
output_streams,
bolt_parallelism):
bolt = protoTopology.Bolt()
bolt.comp.name = bolt_name
kv = bolt.comp.config.kvs.add()
kv.key = api_constants.TOPOLOGY_COMPONENT_PARALLELISM
kv.type = protoTopology.ConfigValueType.Value('STRING_VALUE')
kv.value = str(bolt_parallelism)
for stream in input_streams:
bolt.inputs.add().stream.CopyFrom(stream)
for stream in output_streams:
bolt.outputs.add().stream.CopyFrom(stream)
return bolt
def create_mock_simple_topology(
self,
spout_parallelism=1,
bolt_parallelism=1):
"""
Simple topology contains one spout and one bolt.
"""
topology = protoTopology.Topology()
topology.id = MockProto.topology_id
topology.name = MockProto.topology_name
# Stream1
stream1 = protoTopology.StreamId()
stream1.id = "mock_stream1"
stream1.component_name = "mock_spout"
# Spout1
spout = self.create_mock_spout("mock_spout", [stream1], spout_parallelism)
topology.spouts.extend([spout])
# Bolt1
bolt = self.create_mock_bolt("mock_bolt", [stream1], [], bolt_parallelism)
topology.bolts.extend([bolt])
return topology
def create_mock_medium_topology(
self,
spout_parallelism=1,
bolt1_parallelism=1,
bolt2_parallelism=1,
bolt3_parallelism=1):
"""
Medium topology is a three stage topology
with one spout, two mid stage bolts, and one
last stage bolt.
S -str1-> B1 -str3-> B3
S -str2-> B2 -str4-> B3
"""
topology = protoTopology.Topology()
topology.id = "mock_topology_id"
topology.name = "mock_topology_name"
# Streams
stream1 = protoTopology.StreamId()
stream1.id = "mock_stream1"
stream1.component_name = "mock_spout1"
stream2 = protoTopology.StreamId()
stream2.id = "mock_stream2"
stream2.component_name = "mock_spout1"
stream3 = protoTopology.StreamId()
stream3.id = "mock_stream3"
stream3.component_name = "mock_bolt1"
stream4 = protoTopology.StreamId()
stream4.id = "mock_stream4"
stream4.component_name = "mock_bolt2"
# Spouts
spout1 = self.create_mock_spout("mock_spout1",
[stream1, stream2],
spout_parallelism)
topology.spouts.extend([spout1])
# Bolts
bolt1 = self.create_mock_bolt("mock_bolt1",
[stream1],
[stream3],
bolt1_parallelism)
bolt2 = self.create_mock_bolt("mock_bolt2",
[stream2],
[stream4],
bolt2_parallelism)
bolt3 = self.create_mock_bolt("mock_bolt3",
[stream3, stream4],
[],
bolt3_parallelism)
topology.bolts.extend([bolt1, bolt2, bolt3])
return topology
def create_mock_simple_physical_plan(
self,
spout_parallelism=1,
bolt_parallelism=1):
pplan = protoPPlan.PhysicalPlan()
pplan.topology.CopyFrom(self.create_mock_simple_topology(
spout_parallelism,
bolt_parallelism))
return pplan
def create_mock_simple_packing_plan(
self):
packingPlan = protoPackingPlan.PackingPlan()
packingPlan.id = "ExclamationTopology"
packingPlan.container_plans.extend([self.create_mock_simple_container_plan()])
return packingPlan
def create_mock_simple_packing_plan2(
self):
packingPlan = protoPackingPlan.PackingPlan()
packingPlan.id = "ExclamationTopology"
packingPlan.container_plans.extend([self.create_mock_simple_container_plan2()])
packingPlan.container_plans.extend([self.create_mock_simple_container_plan()])
return packingPlan
def create_mock_medium_physical_plan(
self,
spout_parallelism=1,
bolt1_parallelism=1,
bolt2_parallelism=1,
bolt3_parallelism=1):
pplan = protoPPlan.PhysicalPlan()
pplan.topology.CopyFrom(self.create_mock_medium_topology(
spout_parallelism,
bolt1_parallelism,
bolt2_parallelism,
bolt3_parallelism))
return pplan
def create_mock_execution_state(self):
estate = protoEState.ExecutionState()
estate.topology_name = MockProto.topology_name
estate.topology_id = MockProto.topology_id
estate.cluster = MockProto.cluster
estate.environ = MockProto.environ
return estate
def create_mock_tmaster(self):
tmaster = protoTmaster.TMasterLocation()
return tmaster
def add_topology_config(self, topology, key, value):
kv = topology.topology_config.kvs.add()
kv.key = key
kv.type = protoTopology.ConfigValueType.Value('STRING_VALUE')
kv.value = str(value)