blob: ad9a7eb4e167a2f21faeeee3e66c7ed4087e41f8 [file] [log] [blame]
# Copyright 2016 - Twitter, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Streamlet for Apache Pulsar"""
from heronpy.dsl.streamlet import Streamlet
from heronpy.dsl.operation import OperationType
from heronpy.connectors.pulsar.pulsarspout import PulsarSpout
# pylint: disable=access-member-before-definition
# pylint: disable=attribute-defined-outside-init
class PulsarStreamlet(Streamlet):
"""Streamlet facade on top of PulsarSpout"""
def __init__(self, service_url, topic_name, stage_name=None, parallelism=None,
receive_timeout_ms=None, input_schema=None):
super(PulsarStreamlet, self).__init__(parents=[], operation=OperationType.Input,
stage_name=stage_name,
parallelism=parallelism)
self._pulsar_service_url = service_url
self._pulsar_topic_name = topic_name
self._pulsar_receive_timeout_ms = receive_timeout_ms
self._pulsar_input_schema = input_schema
@staticmethod
def pulsarStreamlet(service_url, topic_name, stage_name=None, parallelism=None,
receive_timeout_ms=None, input_schema=None):
if service_url is None:
raise RuntimeError("Pulsar Service Url cannot be None")
if topic_name is None:
raise RuntimeError("Pulsar Topic Name cannot be None")
return PulsarStreamlet(service_url, topic_name, stage_name=stage_name,
parallelism=parallelism, receive_timeout_ms=receive_timeout_ms,
input_schema=input_schema)
# pylint: disable=no-self-use
def _calculate_inputs(self):
return {}
def _calculate_stage_name(self, existing_stage_names):
index = 1
stage_name = "pulsarspout-" + self._pulsar_topic_name
while stage_name in existing_stage_names:
index = index + 1
stage_name = "pulsarspout-" + self._pulsar_topic_name + str(index)
return stage_name
def _build_this(self, bldr):
config = {PulsarSpout.serviceUrl : self._pulsar_service_url,
PulsarSpout.topicName : self._pulsar_topic_name}
if self._pulsar_receive_timeout_ms is not None:
config.update({PulsarSpout.receiveTimeoutMs : self._pulsar_receive_timeout_ms})
if self._pulsar_input_schema is not None:
config.update({PulsarSpout.deserializer : self._pulsar_input_schema})
bldr.add_spout(self._stage_name, PulsarSpout, par=self._parallelism, config=config)