window算子初始化

window的实例化和初始化时机,与普通无状态算子一样,在构建DataStream阶段以stage形式加入pipeline。在启动 DataStream阶段完成window的初始化。

img.png

  • 给window初始化WindowStorage用户状态存储;

    WindowStorage包括localStorage存储和remoteStorage存储;localStorage使用RocksDB, remoteStorage使用mysql;

  • 向window添加一个WindowCache的匿名实例,用于存储写入shuffle topic之前数据;

  • 向window添加SQLCache,作为写入Mysql之前的缓存;

  • 向window添加ShuffleChannel,作为写出shuffle和接收来自shufffle topic数据的通道;

ShuffleChannel写出shuffle数据

AbstractShuffleWindow的doMessage方法,将数据写入shuffleChannel

public AbstractContext<IMessage> doMessage(IMessage message, AbstractContext context) {
        shuffleChannel.startChannel();
        return super.doMessage(message, context);
}
  • shuffleChannel.startChannel 启动shuffleChannel中的consumer,从shuffletopic中消费数据;如果有消费到数据,将由 shuffleChannel的doMessage处理。

  • AbstractWindow.doMessage方法

对于一条消息来说,window 首先需要检查是否有窗口实例,如果没有则创建。如果窗口实例已经超过最大的watermark, 数据丢弃,否则进行消息积累 消息会先经历batchAdd 然后flush加入到windowCache中;windowCache定时触发,加入到 shuffleMsgCache中,shuffleMsgCache中定时发出,用shuffleMsgCache中的producer写出到rocketmq。

ShuffleChannel接收到shuffle数据

ShuffleChannel#doMessage方法;

将shuffle消息加入到shuffleCache中

最终进入ShuffleCache#batchInsert中

WindowOperator#shuffleCalculate中

实际窗口计算:WindowValue#calculate

计算后并不会马上触发窗口,窗口需要定时出发

window触发

WindowFireSource#startSource启动定时任务,1s检查一次窗口是否触发WindowFireSource#fireWindowInstance WindowOperator#fireWindowInstance

windowFireSource.executeMessage

windowFireSource.executeMessage这个方法里面会执行pipeline的下个节点