Start流程

流式计算在运行时可以拉起多个相同实例进行扩容,所以不能直接启动上述已经构建好的拓扑图,需要将上述构建好的拓扑 图保存起来,需要扩容时,直接拿出算子的副本,实例化启动即可。

统一管理点

  • 加载统一管理点IConfigurableService;

    三种方式存储:Memory, db, file

  • PipelineBuilder的build方法,将构建构成中保存起来的IConfigurable,source和statge都是IConfigurable, 保存到IConfigurableService中;

  • IConfigurableService的refreshConfigurable方法;

    1.主要做的事可以概括:从统一管理点加载出组件,赋值,init,在调用后置方法doProcessAfterRefreshConfigurable。

    2.ChainPipeline的后置方法比较特殊,会调用pipeline中各个组件的后置方法,如果这个组件是普通UDFChainStage, 那么将会反序列化,实例成StageBuilder。如果是WindowChainStage,会讲用户数据接收的window实例化出来。

    3.从IConfigurable中加载实例副本出来;

    4.将实例副本赋值;

    5.初始化实例副本,实例都是AbstractConfigurable的继承类,调用他的的init方法。比如在初始化rocketmqSource 的时候,就会在此时调用init方法,先于启动方法调用;

    6.调用IConfigurable的doProcessAfterRefreshConfigurable方法,目前只有ChainPipeline会调用, (典型的是ChainPipeline),会在此方法中构建label与stage映射的stageMap;设置source;再调用 ChainPipeline中各个stage的doProcessAfterRefreshConfigurable方法;

    7.这里ChainPipeline的stage都是UDFChainStage类似。UDFChainStage的 doProcessAfterRefreshConfigurable方法会将之前序列化好的StageBuilder反序列化,成为StageBuilder实例。

    8.如果这个stage是window类型的WindowChainStage,ChainPipeline调用各个stage的 doProcessAfterRefreshConfigurable。这里会将用于数据接收的window实例化赋值;

    9.OutputChainStage此时会从统一管理点IConfigurableService查询出sink实例,并赋值给自己sink字段;

ChainPipeline的启动

pipeline.startChannel();

将ChainPipeline作为整个数据接收的入口,并启动source;

当source有数据进来时,ChainPipeline将会收到数据;具体方法是ChainPipeline的doMessageInner方法;

该方法将数据封装承AbstractContext后,向后传递;