window的实例化和初始化时机,与普通无状态算子一样,在构建DataStream阶段以stage形式加入pipeline。在启动 DataStream阶段完成window的初始化。
给window初始化WindowStorage用户状态存储;
WindowStorage包括localStorage存储和remoteStorage存储;localStorage使用RocksDB, remoteStorage使用mysql;
向window添加一个WindowCache的匿名实例,用于存储写入shuffle topic之前数据;
向window添加SQLCache,作为写入Mysql之前的缓存;
向window添加ShuffleChannel,作为写出shuffle和接收来自shufffle topic数据的通道;
AbstractShuffleWindow的doMessage方法,将数据写入shuffleChannel
public AbstractContext<IMessage> doMessage(IMessage message, AbstractContext context) { shuffleChannel.startChannel(); return super.doMessage(message, context); }
shuffleChannel.startChannel 启动shuffleChannel中的consumer,从shuffletopic中消费数据;如果有消费到数据,将由 shuffleChannel的doMessage处理。
AbstractWindow.doMessage方法
对于一条消息来说,window 首先需要检查是否有窗口实例,如果没有则创建。如果窗口实例已经超过最大的watermark, 数据丢弃,否则进行消息积累 消息会先经历batchAdd 然后flush加入到windowCache中;windowCache定时触发,加入到 shuffleMsgCache中,shuffleMsgCache中定时发出,用shuffleMsgCache中的producer写出到rocketmq。
ShuffleChannel#doMessage方法;
将shuffle消息加入到shuffleCache中
最终进入ShuffleCache#batchInsert中
WindowOperator#shuffleCalculate中
实际窗口计算:WindowValue#calculate
计算后并不会马上触发窗口,窗口需要定时出发
WindowFireSource#startSource启动定时任务,1s检查一次窗口是否触发WindowFireSource#fireWindowInstance WindowOperator#fireWindowInstance
windowFireSource.executeMessage
windowFireSource.executeMessage这个方法里面会执行pipeline的下个节点