blob: 7c7ddccae75674b5e5826dc1bcfc8d0d210bb88d [file] [log] [blame]
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Base spout for integration tests"""
import copy
from heron.common.src.python.utils.log import Log
from heronpy.api.spout.spout import Spout
from heronpy.api.stream import Stream
from heronpy.api.component.component_spec import HeronComponentSpec
import heron.common.src.python.pex_loader as pex_loader
from ..core import constants as integ_const
class IntegrationTestSpout(Spout):
"""Base spout for integration test
Every spout of integration test topology consists of this instance, each delegating user's spout.
"""
outputs = [Stream(fields=[integ_const.INTEGRATION_TEST_TERMINAL],
name=integ_const.INTEGRATION_TEST_CONTROL_STREAM_ID)]
@classmethod
def spec(cls, name, par, config, user_spout_classpath, user_output_fields=None):
python_class_path = "%s.%s" % (cls.__module__, cls.__name__)
config[integ_const.USER_SPOUT_CLASSPATH] = user_spout_classpath
# avoid modification to cls.outputs
_outputs = copy.copy(cls.outputs)
if user_output_fields is not None:
_outputs.extend(user_output_fields)
return HeronComponentSpec(name, python_class_path, is_spout=True, par=par,
inputs=None, outputs=_outputs, config=config)
def initialize(self, config, context):
user_spout_classpath = config.get(integ_const.USER_SPOUT_CLASSPATH, None)
if user_spout_classpath is None:
raise RuntimeError("User defined integration test spout was not found")
user_spout_cls = self._load_user_spout(context.get_topology_pex_path(), user_spout_classpath)
self.user_spout = user_spout_cls(delegate=self)
self.max_executions = config.get(integ_const.USER_MAX_EXECUTIONS, integ_const.MAX_EXECUTIONS)
assert isinstance(self.max_executions, int) and self.max_executions > 0
Log.info("Max executions: %d" % self.max_executions)
self.tuples_to_complete = 0
self.user_spout.initialize(config, context)
@staticmethod
def _load_user_spout(pex_file, classpath):
pex_loader.load_pex(pex_file)
cls = pex_loader.import_and_get_class(pex_file, classpath)
return cls
@property
def is_done(self):
return self.max_executions == 0
def next_tuple(self):
if self.is_done:
return
self.max_executions -= 1
Log.info("max executions: %d" % self.max_executions)
self.user_spout.next_tuple()
if self.is_done:
self._emit_terminal_if_needed()
Log.info("This topology is finished.")
def ack(self, tup_id):
Log.info("Received an ack with tuple id: %s" % str(tup_id))
self.tuples_to_complete -= 1
if tup_id != integ_const.INTEGRATION_TEST_MOCK_MESSAGE_ID:
self.user_spout.ack(tup_id)
self._emit_terminal_if_needed()
def fail(self, tup_id):
Log.info("Received a fail message with tuple id: %s" % str(tup_id))
self.tuples_to_complete -= 1
if tup_id != integ_const.INTEGRATION_TEST_MOCK_MESSAGE_ID:
self.user_spout.fail(tup_id)
self._emit_terminal_if_needed()
def emit(self, tup, tup_id=None, stream=Stream.DEFAULT_STREAM_ID,
direct_task=None, need_task_ids=None):
"""Emits from this integration test spout
Overriden method which will be called when user's spout calls emit()
"""
# if is_control True -> control stream should not count
self.tuples_to_complete += 1
if tup_id is None:
Log.info("Add tup_id for tuple: %s" % str(tup))
_tup_id = integ_const.INTEGRATION_TEST_MOCK_MESSAGE_ID
else:
_tup_id = tup_id
super(IntegrationTestSpout, self).emit(tup, _tup_id, stream, direct_task, need_task_ids)
def _emit_terminal_if_needed(self):
Log.info("is_done: %s, tuples_to_complete: %s" % (self.is_done, self.tuples_to_complete))
if self.is_done and self.tuples_to_complete == 0:
Log.info("Emitting terminals to downstream")
super(IntegrationTestSpout, self).emit([integ_const.INTEGRATION_TEST_TERMINAL],
stream=integ_const.INTEGRATION_TEST_CONTROL_STREAM_ID)