merge from replaceDB
diff --git a/pom.xml b/pom.xml
index a865740..0423a7c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,7 +99,6 @@
<module>rocketmq-streams-channel-http</module>
<module>rocketmq-streams-state</module>
<module>rocketmq-streams-examples</module>
- <module>rocketmq-streams-checkpoint</module>
<module>rocketmq-streams-connectors</module>
<module>rocketmq-streams-channel-syslog</module>
<module>rocketmq-streams-channel-es</module>
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java
deleted file mode 100644
index 6650744..0000000
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQOffset.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.streams;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.rocketmq.client.consumer.store.OffsetStore;
-import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.streams.common.channel.source.AbstractSupportShuffleSource;
-import org.apache.rocketmq.streams.common.utils.ReflectUtil;
-import org.apache.rocketmq.streams.debug.DebugWriter;
-import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
-
-public class RocketMQOffset implements OffsetStore {
- protected OffsetStore offsetStore;
- protected AbstractSupportShuffleSource source;
- private AtomicBoolean starting;
-
- public RocketMQOffset(OffsetStore offsetStore, AbstractSupportShuffleSource source) {
- this.offsetStore = offsetStore;
- this.source = source;
- this.starting = new AtomicBoolean(true);
- }
-
- @Override
- public void load() throws MQClientException {
- offsetStore.load();
- }
-
- @Override
- public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
- offsetStore.updateOffset(mq, offset, increaseOnly);
- }
-
- @Override
- public long readOffset(MessageQueue mq, ReadOffsetType type) {
- return offsetStore.readOffset(mq, type);
- }
-
- @Override
- public void persistAll(Set<MessageQueue> mqs) {
- offsetStore.persistAll(mqs);
- }
-
- @Override
- public void persist(MessageQueue mq) {
- offsetStore.persist(mq);
- }
-
- @Override
- public void removeOffset(MessageQueue mq) {
- //todo 启动时第一次做rebalance时source中也没有原有消费mq,不做移除,做了会有副作用
- //后续整个checkpoint机制都会调整成异步,整块代码都不会保留,目前为了整体跑通,不做修改。
- if (starting.get()) {
- starting.set(false);
- } else {
- Set<String> splitIds = new HashSet<>();
- splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
- source.removeSplit(splitIds);
- }
-
- offsetStore.removeOffset(mq);
- }
-
- @Override
- public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
- return offsetStore.cloneOffsetTable(topic);
- }
-
- @Override
- public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- source.sendCheckpoint(new RocketMQMessageQueue(mq).getQueueId());
- if (DebugWriter.isOpenDebug()) {
- ConcurrentMap<MessageQueue, AtomicLong> offsetTable = ReflectUtil.getDeclaredField(this.offsetStore, "offsetTable");
- DebugWriter.getInstance(source.getTopic()).writeSaveOffset(mq, offsetTable.get(mq));
- }
- offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
- }
-}
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/MessageListenerDelegator.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/MessageListenerDelegator.java
new file mode 100644
index 0000000..08c047c
--- /dev/null
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/MessageListenerDelegator.java
@@ -0,0 +1,62 @@
+package org.apache.rocketmq.streams.source;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class MessageListenerDelegator implements MessageQueueListener {
+ private final MessageQueueListener delegator;
+ private Set<MessageQueue> lastDivided = null;
+ private Set<MessageQueue> removingQueue;
+
+
+ public MessageListenerDelegator(MessageQueueListener delegator) {
+ this.delegator = delegator;
+ }
+
+
+ @Override
+ public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+
+ //上一次分配有,但是这一次没有,需要对这些mq进行状态移除
+ if (lastDivided != null) {
+ this.removingQueue = new HashSet<>();
+ for (MessageQueue last : lastDivided) {
+ if (!mqDivided.contains(last)) {
+ removingQueue.add(last);
+ }
+ }
+ }
+
+ delegator.messageQueueChanged(topic, mqAll, mqDivided);
+
+ lastDivided = mqDivided;
+ }
+
+ public Set<MessageQueue> getLastDivided() {
+ return Collections.unmodifiableSet(this.lastDivided);
+ }
+
+ public Set<MessageQueue> getRemovingQueue() {
+ return Collections.unmodifiableSet(this.removingQueue);
+ }
+}
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index 5d60532..b9f89e1 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -18,7 +18,9 @@
package org.apache.rocketmq.streams.source;
import com.alibaba.fastjson.JSONObject;
+
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -26,36 +28,27 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueByConfig;
-import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
-import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.streams.common.channel.source.AbstractSupportShuffleSource;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.utils.ReflectUtil;
-import org.apache.rocketmq.streams.debug.DebugWriter;
import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -64,24 +57,19 @@
protected static final Log LOG = LogFactory.getLog(RocketMQSource.class);
private static final String STRATEGY_AVERAGE = "average";
- private static final String STRATEGY_MACHINE = "machine";
- private static final String STRATEGY_CONFIG = "config";
@ENVDependence
- protected String tags = "*";
+ private String tags = SubscriptionData.SUB_ALL;
- /**
- * 消息队列命名空间接入点
- */
-
-
- protected Long pullIntervalMs;
-
- protected String strategyName;
-
- protected transient DefaultMQPushConsumer consumer;
- protected transient ConsumeFromWhere consumeFromWhere;//默认从哪里消费,不会被持久化。不设置默认从尾部消费
- protected transient String consumerOffset;//从哪里开始消费
+ private int userPullThreadNum = 1;
+ private long pullTimeout;
+ private long commitInternalMs = 1000;
+ private String strategyName;
+ private transient ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET;//默认从哪里消费,不会被持久化。不设置默认从尾部消费
+ private RPCHook rpcHook;
+ private transient DefaultLitePullConsumer pullConsumer;
+ private transient ExecutorService executorService;
+ private transient PullTask[] pullTasks;
public RocketMQSource() {
}
@@ -107,109 +95,55 @@
protected boolean startSource() {
try {
destroyConsumer();
- consumer = startConsumer();
+
+ this.pullConsumer = buildPullConsumer(topic, groupName, namesrvAddr, tags, rpcHook, consumeFromWhere);
+ this.pullConsumer.start();
+
+ if (this.executorService == null) {
+ this.executorService = new ThreadPoolExecutor(userPullThreadNum, userPullThreadNum, 0, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(1000), r -> new Thread(r, "RStream-poll-thread"));
+ }
+
+ pullTasks = new PullTask[userPullThreadNum];
+ for (int i = 0; i < userPullThreadNum; i++) {
+ pullTasks[i] = new PullTask(this.pullConsumer, pullTimeout, commitInternalMs);
+ this.executorService.execute(pullTasks[i]);
+ }
+
return true;
- } catch (Exception e) {
+ } catch (MQClientException e) {
setInitSuccess(false);
- e.printStackTrace();
throw new RuntimeException("start rocketmq channel error " + topic, e);
}
}
- protected DefaultMQPushConsumer startConsumer() {
- try {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
- if (pullIntervalMs != null) {
- consumer.setPullInterval(pullIntervalMs);
- }
- AllocateMessageQueueStrategy defaultStrategy = new AllocateMessageQueueAveragely();
- if (STRATEGY_AVERAGE.equalsIgnoreCase(this.strategyName)) {
- consumer.setAllocateMessageQueueStrategy(defaultStrategy);
- } else if (STRATEGY_MACHINE.equalsIgnoreCase(this.strategyName)) {
- //consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueByMachine(defaultStrategy));
- } else if (STRATEGY_CONFIG.equalsIgnoreCase(this.strategyName)) {
- consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueByConfig());
- }
+ public DefaultLitePullConsumer buildPullConsumer(String topic, String groupName, String namesrv, String tags,
+ RPCHook rpcHook, ConsumeFromWhere consumeFromWhere) throws MQClientException {
+ DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(groupName, rpcHook);
+ pullConsumer.setNamesrvAddr(namesrv);
+ pullConsumer.setConsumeFromWhere(consumeFromWhere);
+ pullConsumer.subscribe(topic, tags);
+ pullConsumer.setAutoCommit(false);
+ pullConsumer.setPullBatchSize(1000);
- consumer.setPersistConsumerOffsetInterval((int) this.checkpointTime);
- consumer.setConsumeMessageBatchMaxSize(maxFetchLogGroupSize);
- consumer.setNamesrvAddr(this.namesrvAddr);
- if (consumeFromWhere != null) {
- consumer.setConsumeFromWhere(consumeFromWhere);
- if (consumerOffset != null) {
- consumer.setConsumeTimestamp(consumerOffset);
- }
- }
- Map<String, Boolean> isFirstDataForQueue = new HashMap<>();
- //consumer.setCommitOffsetWithPullRequestEnable(false);
- consumer.subscribe(topic, tags);
- consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
- try {
- int i = 0;
- for (MessageExt msg : msgs) {
+ MessageQueueListener origin = pullConsumer.getMessageQueueListener();
- JSONObject jsonObject = create(msg.getBody(), msg.getProperties());
+ MessageListenerDelegator delegator = new MessageListenerDelegator(origin);
- String queueId = RocketMQMessageQueue.getQueueId(context.getMessageQueue());
- String offset = msg.getQueueOffset() + "";
- org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, queueId, offset, false);
- message.getHeader().setOffsetIsLong(true);
+ pullConsumer.setMessageQueueListener(delegator);
- if (DebugWriter.isOpenDebug()) {
- Boolean isFirstData = isFirstDataForQueue.get(queueId);
- if (isFirstData == null) {
- synchronized (this) {
- isFirstData = isFirstDataForQueue.get(queueId);
- if (isFirstData == null) {
- isFirstDataForQueue.put(queueId, true);
- }
- DebugWriter.getInstance(getTopic()).receiveFirstData(queueId, msg.getQueueOffset());
- }
- }
- }
-
- if (i == msgs.size() - 1) {
- message.getHeader().setNeedFlush(true);
- }
- executeMessage(message);
- i++;
- }
- } catch (Exception e) {
-
- LOG.error("consume message from rocketmq error " + e, e);
- e.printStackTrace();
-
- }
-
- return ConsumeOrderlyStatus.SUCCESS;// 返回消费成功
- });
-
- setOffsetStore(consumer);
- consumer.start();
-
- return consumer;
- } catch (Exception e) {
- setInitSuccess(false);
- e.printStackTrace();
- throw new RuntimeException("start metaq channel error " + topic, e);
- }
+ return pullConsumer;
}
+
@Override
public List<ISplit> getAllSplits() {
try {
List<ISplit> messageQueues = new ArrayList<>();
- if (messageQueues == null || messageQueues.size() == 0) {
- Set<MessageQueue> metaqQueueSet = consumer.fetchSubscribeMessageQueues(this.topic);
- for (MessageQueue queue : metaqQueueSet) {
- RocketMQMessageQueue metaqMessageQueue = new RocketMQMessageQueue(queue);
- if (isNotDataSplit(metaqMessageQueue.getQueueId())) {
- continue;
- }
-
- messageQueues.add(metaqMessageQueue);
-
- }
+ Collection<MessageQueue> metaqQueueSet = this.pullConsumer.fetchMessageQueues(this.topic);
+ for (MessageQueue queue : metaqQueueSet) {
+ RocketMQMessageQueue metaqMessageQueue = new RocketMQMessageQueue(queue);
+ messageQueues.add(metaqMessageQueue);
}
return messageQueues;
} catch (MQClientException e) {
@@ -218,12 +152,13 @@
}
}
+ //todo 计算正在工作的分片?
@Override
public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExt.setVipChannelEnabled(false);
defaultMQAdminExt.setAdminExtGroup(UUID.randomUUID().toString());
- defaultMQAdminExt.setInstanceName(this.consumer.getInstanceName());
+ defaultMQAdminExt.setInstanceName(this.pullConsumer.getInstanceName());
try {
defaultMQAdminExt.start();
Map<MessageQueue, String> queue2Instances = getMessageQueueAllocationResult(defaultMQAdminExt, this.groupName);
@@ -234,11 +169,7 @@
continue;
}
String instanceName = queue2Instances.get(messageQueue);
- List<ISplit> splits = instanceOwnerQueues.get(instanceName);
- if (splits == null) {
- splits = new ArrayList<>();
- instanceOwnerQueues.put(instanceName, splits);
- }
+ List<ISplit> splits = instanceOwnerQueues.computeIfAbsent(instanceName, k -> new ArrayList<>());
splits.add(metaqMessageQueue);
}
return instanceOwnerQueues;
@@ -250,9 +181,9 @@
}
}
- protected Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
- String groupName) {
- HashMap results = new HashMap();
+ private Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
+ String groupName) {
+ HashMap<MessageQueue, String> results = new HashMap<>();
try {
ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
@@ -276,49 +207,9 @@
return results;
}
- /**
- * 设置offset存储,包装原有的RemoteBrokerOffsetStore,在保存offset前发送系统消息 this method suggest to be removed, use check barrier to achieve checkpoint asynchronous.
- *
- * @param consumer
- */
- protected void setOffsetStore(DefaultMQPushConsumer consumer) {
- DefaultMQPushConsumerImpl defaultMQPushConsumer = consumer.getDefaultMQPushConsumerImpl();
- if (consumer.getMessageModel() == MessageModel.CLUSTERING) {
- consumer.changeInstanceNameToPID();
- }
- MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(defaultMQPushConsumer.getDefaultMQPushConsumer());
- RemoteBrokerOffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, NamespaceUtil.wrapNamespace(consumer.getNamespace(), consumer.getConsumerGroup())) {
- Set<MessageQueue> firstComing = new HashSet<>();
- @Override
- public void removeOffset(MessageQueue mq) {
- if (!firstComing.contains(mq)){
- firstComing.add(mq);
- } else {
- Set<String> splitIds = new HashSet<>();
- splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
- removeSplit(splitIds);
- }
- super.removeOffset(mq);
- }
-
- @Override
- public void updateConsumeOffsetToBroker(MessageQueue mq, long offset,
- boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- sendCheckpoint(new RocketMQMessageQueue(mq).getQueueId());
- if (DebugWriter.isOpenDebug()) {
- ConcurrentMap<MessageQueue, AtomicLong> offsetTable = ReflectUtil.getDeclaredField(this, "offsetTable");
- DebugWriter.getInstance(getTopic()).writeSaveOffset(mq, offsetTable.get(mq));
- }
- // LOG.info("the queue Id is " + new RocketMQMessageQueue(mq).getQueueId() + ",rocketmq start save offset,the save time is " + DateUtil.getCurrentTimeString());
- super.updateConsumeOffsetToBroker(mq, offset, isOneway);
- }
- };
- consumer.setOffsetStore(offsetStore);//每个一分钟运行一次
- }
-
@Override
protected boolean isNotDataSplit(String queueId) {
- return queueId.toUpperCase().startsWith("RETRY") || queueId.toUpperCase().startsWith("%RETRY%");
+ return false;
}
@Override
@@ -337,21 +228,24 @@
}
public void destroyConsumer() {
- List<DefaultMQPushConsumer> oldConsumers = new ArrayList<>();
- if (consumer != null) {
- oldConsumers.add(consumer);
- }
- try {
- for (DefaultMQPushConsumer consumer : oldConsumers) {
- consumer.shutdown();
- }
-
- } catch (Throwable t) {
- if (LOG.isWarnEnabled()) {
- LOG.warn(t.getMessage(), t);
- }
+ if (this.pullConsumer == null || this.pullTasks == null || this.pullTasks.length == 0) {
+ return;
}
+ //不在拉取新的数据
+ for (PullTask pullTask : pullTasks) {
+ pullTask.shutdown();
+ }
+
+ //线程池关闭
+ this.executorService.shutdown();
+
+ //关闭消费实例
+ this.pullConsumer.shutdown();
+ }
+
+ public void commit(Set<MessageQueue> messageQueues) {
+ this.pullConsumer.commit(messageQueues, true);
}
@Override
@@ -360,12 +254,83 @@
destroyConsumer();
}
- public String getStrategyName() {
- return strategyName;
- }
+ public class PullTask implements Runnable {
+ private final long pullTimeout;
+ private final long commitInternalMs;
+ private volatile long lastCommit = 0L;
- public void setStrategyName(String strategyName) {
- this.strategyName = strategyName;
+ private final DefaultLitePullConsumer pullConsumer;
+ private final MessageListenerDelegator delegator;
+
+ private volatile boolean isStopped = false;
+
+ public PullTask(DefaultLitePullConsumer pullConsumer, long pullTimeout, long commitInternalMs) {
+ this.pullConsumer = pullConsumer;
+ this.delegator = (MessageListenerDelegator) pullConsumer.getMessageQueueListener();
+ this.pullTimeout = pullTimeout == 0 ? pullConsumer.getPollTimeoutMillis() : pullTimeout;
+ this.commitInternalMs = commitInternalMs;
+ }
+
+ @Override
+ public void run() {
+
+ while (!this.isStopped) {
+
+ synchronized (this.pullConsumer) {
+ Set<MessageQueue> removingQueue = this.delegator.getRemovingQueue();
+ if (removingQueue != null && removingQueue.size() != 0) {
+ try {
+ Set<String> splitIds = new HashSet<>();
+ for (MessageQueue mq : removingQueue) {
+ splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
+ }
+
+ RocketMQSource.this.removeSplit(splitIds);
+ } finally {
+ removingQueue.clear();
+ }
+ }
+ }
+
+
+ List<MessageExt> msgs = pullConsumer.poll(pullTimeout);
+
+ int i = 0;
+ for (MessageExt msg : msgs) {
+ JSONObject jsonObject = create(msg.getBody(), msg.getProperties());
+
+ String topic = msg.getTopic();
+ int queueId = msg.getQueueId();
+ String brokerName = msg.getBrokerName();
+ MessageQueue queue = new MessageQueue(topic, brokerName, queueId);
+ String unionQueueId = RocketMQMessageQueue.getQueueId(queue);
+
+
+ String offset = msg.getQueueOffset() + "";
+ org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, unionQueueId, offset, false);
+ message.getHeader().setOffsetIsLong(true);
+
+ if (i == msgs.size() - 1) {
+ message.getHeader().setNeedFlush(true);
+ }
+ executeMessage(message);
+ i++;
+ }
+
+ //拉取的批量消息处理完成以后判断是否提交位点;
+ synchronized (this.pullConsumer) {
+ if (System.currentTimeMillis() - lastCommit >= commitInternalMs || isStopped) {
+ lastCommit = System.currentTimeMillis();
+ //向broker提交消费位点,todo 从consumer那里拿不到正在消费哪些messageQueue
+ commit(this.delegator.getLastDivided());
+ }
+ }
+ }
+ }
+
+ public void shutdown() {
+ this.isStopped = true;
+ }
}
public String getTags() {
@@ -376,31 +341,45 @@
this.tags = tags;
}
- public DefaultMQPushConsumer getConsumer() {
- return consumer;
+ public Long getPullTimeout() {
+ return pullTimeout;
}
- public Long getPullIntervalMs() {
- return pullIntervalMs;
+ public void setPullTimeout(Long pullTimeout) {
+ this.pullTimeout = pullTimeout;
}
- public void setPullIntervalMs(Long pullIntervalMs) {
- this.pullIntervalMs = pullIntervalMs;
+ public String getStrategyName() {
+ return strategyName;
}
- public ConsumeFromWhere getConsumeFromWhere() {
- return consumeFromWhere;
+ public void setStrategyName(String strategyName) {
+ this.strategyName = strategyName;
}
- public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
- this.consumeFromWhere = consumeFromWhere;
+ public RPCHook getRpcHook() {
+ return rpcHook;
}
- public String getConsumerOffset() {
- return consumerOffset;
+ public void setRpcHook(RPCHook rpcHook) {
+ this.rpcHook = rpcHook;
}
- public void setConsumerOffset(String consumerOffset) {
- this.consumerOffset = consumerOffset;
+ public int getUserPullThreadNum() {
+ return userPullThreadNum;
}
+
+ public void setUserPullThreadNum(int userPullThreadNum) {
+ this.userPullThreadNum = userPullThreadNum;
+ }
+
+ public long getCommitInternalMs() {
+ return commitInternalMs;
+ }
+
+ public void setCommitInternalMs(long commitInternalMs) {
+ this.commitInternalMs = commitInternalMs;
+ }
+
+
}
\ No newline at end of file
diff --git a/rocketmq-streams-checkpoint/pom.xml b/rocketmq-streams-checkpoint/pom.xml
deleted file mode 100644
index 6da59e3..0000000
--- a/rocketmq-streams-checkpoint/pom.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>rocketmq-streams</artifactId>
- <groupId>org.apache.rocketmq</groupId>
- <version>1.0.2-preview-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>rocketmq-streams-checkpoint</artifactId>
- <name>ROCKETMQ STREAMS :: checkpoint</name>
- <packaging>jar</packaging>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-streams-commons</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.google.auto.service</groupId>
- <artifactId>auto-service</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-streams-db-operator</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.google.auto.service</groupId>
- <artifactId>auto-service</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.auto.service</groupId>
- <artifactId>auto-service</artifactId>
- <optional>true</optional>
- </dependency>
- </dependencies>
-
-</project>
\ No newline at end of file
diff --git a/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java b/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
deleted file mode 100644
index 69150ac..0000000
--- a/rocketmq-streams-checkpoint/src/main/java/org/apache/rocketmq/streams/checkpoint/db/DBCheckPointStorage.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.checkpoint.db;
-
-import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.checkpoint.AbstractCheckPointStorage;
-import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
-import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
-import org.apache.rocketmq.streams.common.checkpoint.SourceSnapShot;
-import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
-
-/**
- * @description
- */
-public class DBCheckPointStorage extends AbstractCheckPointStorage {
-
- static final Log logger = LogFactory.getLog(DBCheckPointStorage.class);
- static final String STORAGE_NAME = "DB";
-
- public DBCheckPointStorage() {
-
- }
-
- @Override
- public String getStorageName() {
- return STORAGE_NAME;
- }
-
- @Override
- public <T> void save(List<T> checkPointState) {
- logger.info(String.format("save checkpoint size %d", checkPointState.size()));
- ORMUtil.batchReplaceInto(checkPointState);
- }
-
- @Override
- public void finish() {
-
- }
-
- @Override
- //todo
- public CheckPoint recover(ISource iSource, String queueId) {
- String sourceName = CheckPointManager.createSourceName(iSource, null);
- String key = CheckPointManager.createCheckPointKey(sourceName, queueId);
- String sql = "select * from source_snap_shot where `key` = " + "'" + key + "';";
- SourceSnapShot snapShot = ORMUtil.queryForObject(sql, null, SourceSnapShot.class);
-
- logger.info(String.format("checkpoint recover key is %s, sql is %s, recover sourceSnapShot : %s", key, sql, snapShot == null ? "null snapShot" : snapShot.toString()));
- return new CheckPoint().fromSnapShot(snapShot);
- }
-}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
index 3b85e19..65ac261 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
@@ -141,7 +141,7 @@
value2StoreMap.put(groupValue, storeKey);
}
- List<WindowBaseValue> windowBaseValue =new ArrayList<>();
+ List<WindowBaseValue> windowBaseValue = new ArrayList<>();
RocksdbIterator<WindowBaseValue> rocksdbIterator = storage.getWindowBaseValue(instance.getSplitId(),
instance.getWindowInstanceId(), WindowType.SESSION_WINDOW, null);
@@ -423,10 +423,7 @@
}
}
-
-
doFire(queueId, windowInstance, toFireValueList, currentFireTime, nextFireTime);
-
return toFireValueList.size();
}
@@ -445,8 +442,10 @@
return false;
}
+
private void doFire(String queueId, WindowInstance instance, List<WindowValue> valueList, Long currentFireTime,
Long nextFireTime) {
+
if (CollectionUtil.isEmpty(valueList)) {
return;
}