blob: 0787d420f68aed53a3d85f30adb0900bff083f82 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.common.topology;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.Context;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.topology.model.IStageHandle;
import org.apache.rocketmq.streams.common.topology.model.Pipeline;
import org.apache.rocketmq.streams.common.topology.stages.UnionChainStage;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
/**
* 如果某个节点有多个pipline构成,可以继承此类,如union,join节点
*
* @param <T>
*/
public abstract class AbstractMutilPipelineChainPipline<T extends IMessage> extends ChainStage<T> implements IAfterConfigurableRefreshListener {
/**
* pipeline name,这是一个汇聚节点,会有多个pipline,这里存的是pipline name
*/
protected List<String> piplineNames = new ArrayList<>();
//每个pipline,对应一个消息来源,在消息头上会有消息来源的name,根据name转发数据
protected Map<String, String> piplineName2MsgSourceName;
/**
* piplineNames的对象表示
*/
protected transient Map<String, ChainPipeline> piplines = null;
protected transient IStageHandle handle = new IStageHandle() {
@Override
protected IMessage doProcess(IMessage message, AbstractContext context) {
if (CollectionUtil.isEmpty(piplines)) {
return message;
}
String msgSourceName = message.getHeader().getMsgRouteFromLable();
if (piplines.size() > 0) {
List<IMessage> messages = new ArrayList<>();
Iterator<Entry<String, String>> it = piplineName2MsgSourceName.entrySet().iterator();
while (it.hasNext()) {
Entry<String, String> entry = it.next();
String piplineName = entry.getKey();
String value = entry.getValue();
if (msgSourceName != null && msgSourceName.equals(value)) {//如果来源数据的标签和map中的相同,转发这条消息给对应的pipline
ChainPipeline pipline = piplines.get(piplineName);
IMessage copyMessage = message.deepCopy();
//copyMessage.getMessageBody().put(ORI_MESSAGE_KEY,message.getMessageBody());
// 保留一份最原始的数据,后续对字段的修改不影响这个字段
Context newContext = new Context(copyMessage);
copyMessage.getHeader().setMsgRouteFromLable(msgSourceName);
boolean needReturn = executePipline(pipline, copyMessage, newContext, msgSourceName);
if (needReturn) {
return message;
}
if (newContext.isContinue()) {
if (newContext.isSplitModel()) {
messages.addAll(newContext.getSplitMessages());
} else {
messages.add(copyMessage);
}
}
}
}
for (IMessage msg : messages) {
msg.getHeader().setMsgRouteFromLable(msgSourceName);
}
doMessageAfterFinishPipline(message, context, messages);
return message;
}
;
return message;
}
@Override
public String getName() {
return UnionChainStage.class.getName();
}
};
/**
* 找到对应的pipline,并完成执行。如果只要一个pipline满足就可以,返回true,否则返回false
*
* @param copyMessage
* @param newContext
* @param msgSourceName
* @return 不继续处理其他pipline 返回true,否则返回false
*/
protected abstract boolean executePipline(ChainPipeline pipline, IMessage copyMessage, Context newContext, String msgSourceName);
/**
* 如果所有的pipline处理完,还需要继续处理pipline产生的消息,则实现这个方法
*
* @param message
* @param context
* @param messages
*/
protected abstract void doMessageAfterFinishPipline(IMessage message, AbstractContext context, List<IMessage> messages);
@Override
public boolean isAsyncNode() {
for (Pipeline pipline : piplines.values()) {
if (pipline.isAsynNode() == false) {
return false;
}
}
return true;
}
@Override
public void checkpoint(IMessage message, AbstractContext context, CheckPointMessage checkPointMessage) {
sendSystem(message, context, piplines.values());
}
@Override
public void addNewSplit(IMessage message, AbstractContext context, NewSplitMessage newSplitMessage) {
sendSystem(message, context, piplines.values());
}
@Override
public void removeSplit(IMessage message, AbstractContext context, RemoveSplitMessage removeSplitMessage) {
sendSystem(message, context, piplines.values());
}
@Override
public void batchMessageFinish(IMessage message, AbstractContext context, BatchFinishMessage checkPointMessage) {
sendSystem(message, context, piplines.values());
}
public void addPipline(ChainPipeline pipline) {
this.piplineNames.add(pipline.getConfigureName());
}
@Override
protected IStageHandle selectHandle(T t, AbstractContext context) {
return handle;
}
@Override
public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
if (piplineNames == null) {
return;
}
Map<String, ChainPipeline> piplineMap = new HashMap<>();
for (String pipeLineName : piplineNames) {
ChainPipeline chainPipline = configurableService.queryConfigurable(Pipeline.TYPE, pipeLineName);
if (chainPipline != null) {
piplineMap.put(chainPipline.getConfigureName(), chainPipline);
}
}
this.piplines = piplineMap;
}
public List<String> getPiplineNames() {
return piplineNames;
}
public void setPiplineNames(List<String> piplineNames) {
this.piplineNames = piplineNames;
}
public List<ChainPipeline> getPiplines() {
List<ChainPipeline> piplines = new ArrayList<>();
piplines.addAll(this.piplines.values());
return piplines;
}
public Map<String, String> getPiplineName2MsgSourceName() {
return piplineName2MsgSourceName;
}
public ChainPipeline getPipeline(String pipelineName){
return this.piplines.get(pipelineName);
}
public void setPiplineName2MsgSourceName(Map<String, String> piplineName2MsgSourceName) {
this.piplineName2MsgSourceName = piplineName2MsgSourceName;
}
}