blob: b542923d2f7ccd20339e587c2b45e89f00f63e8d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.core.function.supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.SelectAction;
import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
import org.apache.rocketmq.streams.core.running.AbstractWindowProcessor;
import org.apache.rocketmq.streams.core.running.Processor;
import org.apache.rocketmq.streams.core.running.StreamContext;
import org.apache.rocketmq.streams.core.window.Window;
import org.apache.rocketmq.streams.core.window.WindowInfo;
import org.apache.rocketmq.streams.core.window.WindowKey;
import org.apache.rocketmq.streams.core.window.WindowState;
import org.apache.rocketmq.streams.core.window.WindowStore;
import org.apache.rocketmq.streams.core.util.Pair;
import org.apache.rocketmq.streams.core.util.Utils;
import org.apache.rocketmq.streams.core.window.fire.AccumulatorWindowFire;
import org.apache.rocketmq.streams.core.window.fire.AccumulatorSessionWindowFire;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
public class WindowAccumulatorSupplier<K, V, R, OV> implements Supplier<Processor<V>> {
private static final Logger logger = LoggerFactory.getLogger(WindowAccumulatorSupplier.class.getName());
private final String name;
private WindowInfo windowInfo;
private SelectAction<R, V> selectAction;
private Accumulator<R, OV> accumulator;
public WindowAccumulatorSupplier(String name, WindowInfo windowInfo,
SelectAction<R, V> selectAction, Accumulator<R, OV> accumulator) {
this.name = name;
this.windowInfo = windowInfo;
this.selectAction = selectAction;
this.accumulator = accumulator;
}
@Override
public Processor<V> get() {
WindowInfo.WindowType windowType = windowInfo.getWindowType();
switch (windowType) {
case SLIDING_WINDOW:
case TUMBLING_WINDOW:
return new WindowAccumulatorProcessor(name, windowInfo, selectAction, accumulator);
case SESSION_WINDOW:
return new SessionWindowAccumulatorProcessor(name, windowInfo, selectAction, accumulator);
default:
throw new RuntimeException("window type is error, WindowType=" + windowType);
}
}
public class WindowAccumulatorProcessor extends AbstractWindowProcessor<V> {
private final WindowInfo windowInfo;
private String name;
private MessageQueue stateTopicMessageQueue;
private SelectAction<R, V> selectAction;
private Accumulator<R, OV> accumulator;
private WindowStore<K, Accumulator<R, OV>> windowStore;
private final AtomicReference<Throwable> errorReference = new AtomicReference<>(null);
public WindowAccumulatorProcessor(String name, WindowInfo windowInfo, SelectAction<R, V> selectAction, Accumulator<R, OV> accumulator) {
this.name = String.join(Constant.SPLIT, name, WindowAccumulatorProcessor.class.getSimpleName());
this.windowInfo = windowInfo;
this.selectAction = selectAction;
this.accumulator = accumulator;
}
@Override
public void preProcess(StreamContext<V> context) throws RecoverStateStoreThrowable {
super.preProcess(context);
this.windowStore = new WindowStore<>(super.waitStateReplay(),
WindowState::byte2WindowState,
WindowState::windowState2Byte);
this.idleWindowScaner = context.getDefaultWindowScaner();
String stateTopicName = context.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
this.stateTopicMessageQueue = new MessageQueue(stateTopicName, context.getSourceBrokerName(), context.getSourceQueueId());
this.accumulatorWindowFire = new AccumulatorWindowFire<>(this.windowStore,
context.copy(),
this.stateTopicMessageQueue,
this::watermark);
}
/**
* 维持一个watermark,小于watermark的数据都已经达到,触发窗口计算
*/
@Override
public void process(V data) throws Throwable {
Throwable throwable = errorReference.get();
if (throwable != null) {
errorReference.set(null);
throw throwable;
}
K key = this.context.getKey();
long time = this.context.getDataTime();
long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
if (time < watermark) {
//delay data.
logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
return;
}
//f(time) -> List<Window>
List<Window> windows = super.calculateWindow(windowInfo, time);
for (Window window : windows) {
logger.debug("timestamp=" + time + ". time -> window: " + Utils.format(time) + "->" + window);
//f(Window + key, store) -> oldValue
//todo key 怎么转化成对应的string,只和key的值有关系
WindowKey windowKey = new WindowKey(name, super.toHexString(key), window.getEndTime(), window.getStartTime());
WindowState<K, Accumulator<R, OV>> oldState = this.windowStore.get(windowKey);
//f(oldValue, Agg) -> newValue
Accumulator<R, OV> storeAccumulator;
if (oldState == null || oldState.getValue() == null) {
storeAccumulator = accumulator.clone();
} else {
storeAccumulator = oldState.getValue();
}
R select = selectAction.select(data);
storeAccumulator.addValue(select);
//f(Window + key, newValue, store)
WindowState<K, Accumulator<R, OV>> state = new WindowState<>(key, storeAccumulator, time);
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
this.idleWindowScaner.putAccumulatorWindowCallback(windowKey, this.accumulatorWindowFire);
}
try {
List<WindowKey> fire = this.accumulatorWindowFire.fire(name, watermark);
for (WindowKey windowKey : fire) {
this.idleWindowScaner.removeWindowKey(windowKey);
}
} catch (Throwable t) {
errorReference.compareAndSet(null, t);
}
}
}
private class SessionWindowAccumulatorProcessor extends AbstractWindowProcessor<V> {
private final String name;
private final WindowInfo windowInfo;
private MessageQueue stateTopicMessageQueue;
private SelectAction<R, V> selectAction;
private Accumulator<R, OV> accumulator;
private WindowStore<K, Accumulator<R, OV>> windowStore;
public SessionWindowAccumulatorProcessor(String name, WindowInfo windowInfo, SelectAction<R, V> selectAction, Accumulator<R, OV> accumulator) {
this.name = String.join(Constant.SPLIT, name, SessionWindowAccumulatorProcessor.class.getSimpleName());
this.windowInfo = windowInfo;
this.selectAction = selectAction;
this.accumulator = accumulator;
}
@Override
public void preProcess(StreamContext<V> context) throws RecoverStateStoreThrowable {
super.preProcess(context);
this.windowStore = new WindowStore<>(super.waitStateReplay(),
WindowState::byte2WindowState,
WindowState::windowState2Byte);
this.idleWindowScaner = context.getDefaultWindowScaner();
this.idleWindowScaner.initSessionTimeOut(windowInfo.getSessionTimeout().toMilliseconds());
String stateTopicName = context.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX;
this.stateTopicMessageQueue = new MessageQueue(stateTopicName, context.getSourceBrokerName(), context.getSourceQueueId());
this.accumulatorSessionWindowFire = new AccumulatorSessionWindowFire<>(this.windowStore,
context.copy(),
this.stateTopicMessageQueue,
this::watermark);
}
@Override
public void process(V data) throws Throwable {
K key = this.context.getKey();
long time = this.context.getDataTime();
long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
if (time < watermark) {
logger.warn("discard delay data:[{}]. time of data:{}, watermark:{}", data, time, watermark);
return;
}
//本地存储里面搜索下
Pair<Long, Long> newSessionWindowTime = fireIfSessionOut(key, data, time, watermark);
if (newSessionWindowTime != null) {
Accumulator<R, OV> temp = accumulator.clone();
R select = selectAction.select(data);
temp.addValue(select);
WindowState<K, Accumulator<R, OV>> state = new WindowState<>(key, temp, time);
if (time < state.getRecordEarliestTimestamp()) {
//更新最早时间戳,用于状态触发时候,作为session 窗口的begin时间戳
state.setRecordEarliestTimestamp(time);
}
WindowKey windowKey = new WindowKey(name, super.toHexString(key), newSessionWindowTime.getValue(), newSessionWindowTime.getKey());
logger.info("new session window, with key={}, valueTime={}, sessionBegin=[{}], sessionEnd=[{}]", key, Utils.format(time),
Utils.format(newSessionWindowTime.getKey()), Utils.format(newSessionWindowTime.getValue()));
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, this.accumulatorSessionWindowFire);
}
}
//使用前缀查询找到session state, 触发已经session out的 watermark
@SuppressWarnings("unchecked")
private Pair<Long/*sessionBegin*/, Long/*sessionEnd*/> fireIfSessionOut(K key, V data, long dataTime, long watermark) throws Throwable {
List<Pair<WindowKey, WindowState<K, Accumulator<R, OV>>>> pairs = this.windowStore.searchMatchKeyPrefix(name);
if (pairs.size() == 0) {
return new Pair<>(dataTime, dataTime + windowInfo.getSessionTimeout().toMilliseconds());
}
logger.debug("exist session state num={}", pairs.size());
//sessionEndTime小的先触发
Iterator<Pair<WindowKey, WindowState<K, Accumulator<R, OV>>>> iterator = pairs.iterator();
int count = 0;
long lastStateSessionEnd = 0;
long maxFireSessionEnd = Long.MIN_VALUE;
while (iterator.hasNext()) {
Pair<WindowKey, WindowState<K, Accumulator<R, OV>>> pair = iterator.next();
logger.debug("exist session state{}=[{}]", count++, pair);
WindowKey windowKey = pair.getKey();
long sessionEnd = windowKey.getWindowEnd();
if (count == pairs.size()) {
lastStateSessionEnd = sessionEnd;
}
//先触发一遍,触发后从集合中删除
if (sessionEnd < watermark) {
//触发state
List<WindowKey> fire = this.accumulatorSessionWindowFire.fire(name, watermark);
for (WindowKey delete : fire) {
this.idleWindowScaner.removeWindowKey(delete);
}
iterator.remove();
maxFireSessionEnd = Long.max(sessionEnd, maxFireSessionEnd);
}
}
if (dataTime < maxFireSessionEnd) {
logger.warn("late data, discard. key=[{}], data=[{}], dataTime < maxFireSessionEnd: [{}] < [{}]", key, data, dataTime, maxFireSessionEnd);
return null;
}
boolean createNewSessionWindow = false;
WindowKey needToDelete = null;
//再次遍历,找到数据属于某个窗口,如果窗口已经关闭,则只计算新的值,如果窗口没有关闭则计算新值、更新窗口边界、存储状态、删除老值
for (int i = 0; i < pairs.size(); i++) {
Pair<WindowKey, WindowState<K, Accumulator<R, OV>>> pair = pairs.get(i);
WindowKey windowKey = pair.getKey();
WindowState<K, Accumulator<R, OV>> state = pair.getValue();
if (windowKey.getWindowEnd() < dataTime) {
createNewSessionWindow = true;
} else if (windowKey.getWindowStart() <= dataTime) {
logger.debug("data belong to exist session window.dataTime=[{}], window:[{} - {}]", dataTime, Utils.format(windowKey.getWindowStart()), Utils.format(windowKey.getWindowEnd()));
Accumulator<R, OV> value = state.getValue();
R select = selectAction.select(data);
value.addValue(select);
//更新state
state.setValue(value);
state.setRecordLastTimestamp(dataTime);
if (dataTime < state.getRecordEarliestTimestamp()) {
//更新最早时间戳,用于状态触发时候,作为session 窗口的begin时间戳
state.setRecordEarliestTimestamp(dataTime);
}
//如果是最后一个窗口,更新窗口结束时间
if (i == pairs.size() - 1) {
long mayBeSessionEnd = dataTime + windowInfo.getSessionTimeout().toMilliseconds();
if (windowKey.getWindowEnd() < mayBeSessionEnd) {
logger.debug("update exist session window, before:[{} - {}], after:[{} - {}]", Utils.format(windowKey.getWindowStart()), Utils.format(windowKey.getWindowEnd()),
Utils.format(windowKey.getWindowStart()), Utils.format(mayBeSessionEnd));
//删除老状态
needToDelete = windowKey;
//需要保存的新状态
windowKey = new WindowKey(windowKey.getOperatorName(), windowKey.getKey2String(), mayBeSessionEnd, windowKey.getWindowStart());
}
}
} else {
logger.warn("discard data: key=[{}], data=[{}], dataTime=[{}], watermark=[{}]", key, data, dataTime, watermark);
}
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, this.accumulatorSessionWindowFire);
this.idleWindowScaner.removeOldAccumulatorSession(needToDelete);
this.windowStore.deleteByKey(needToDelete);
}
if (pairs.size() == 0 || createNewSessionWindow) {
return new Pair<>(lastStateSessionEnd, dataTime + windowInfo.getSessionTimeout().toMilliseconds());
}
return null;
}
}
}