tree: 48df1972ebc4ce22b2fe9811a6d11c3433c65191 [path history] [tgz]
  1. streampipes/
  2. build-distribution.sh
  3. pom.xml
  4. README.md
  5. requirements.txt
  6. setup.py
streampipes-wrapper-python/README.md

Github Actions Docker pulls Maven central License Last commit Twitter

Apache StreamPipes Wrapper for Python [WIP]

NOTE:

The StreamPipes wrapper for python is currently under development. Thus, the processor model description still needs to be implemented externally in Java.

Apache StreamPipes

Apache StreamPipes enables flexible modeling of stream processing pipelines by providing a graphical modeling editor on top of existing stream processing frameworks.

It leverages non-technical users to quickly define and execute processing pipelines based on an easily extensible toolbox of data sources, data processors and data sinks. StreamPipes has an exchangeable runtime execution layer and executes pipelines using one of the provided wrappers, e.g., for Apache Flink or Apache Kafka Streams.

Pipeline elements in StreamPipes can be installed at runtime - the built-in SDK allows to easily implement new pipeline elements according to your needs. Pipeline elements are standalone microservices that can run anywhere - centrally on your server, in a large-scale cluster or close at the edge.

A Speudocode Example

NOTE: Only works in combination with Java!

from streampipes.core import StandaloneModelSubmitter
from streampipes.manager import Declarer
from streampipes.model.pipeline_element_config import Config
from streampipes.core import EventProcessor


class HelloWorldProcessor(EventProcessor):

    def on_invocation(self):
        pass

    def on_event(self, event):
        event['greeting'] = 'hello world'
        return event

    def on_detach(self):
        pass


def main():
    # Configurations to be stored in key-value store (consul)
    config = Config(app_id='pe/org.apache.streampipes.processor.python')

    config.register(type='host',
                    env_key='SP_HOST',
                    default='processor-python',
                    description='processor hostname')

    config.register(type='port',
                    env_key='SP_PORT',
                    default=8090,
                    description='processor port')

    config.register(type='service',
                    env_key='SP_SERVICE_NAME',
                    default='Python Processor',
                    description='processor service name')

    processors = {
        'org.apache.streampipes.processors.python.helloworld': HelloWorldProcessor,
    }

    # Declarer
    # add the dict of processors to the Declarer
    # This is an abstract class that holds the specified processors
    Declarer.add(processors=processors)

    # StandaloneModelSubmitter
    # Initializes the REST api
    StandaloneModelSubmitter.init(config=config)


if __name__ == '__main__':
    main()