[RIP-74] Implement dynamic load balancing in Flink-Connector-RocketMQ (#126)
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceCheckEvent.java b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceCheckEvent.java
new file mode 100644
index 0000000..70a7206
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceCheckEvent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rocketmq.common.event;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.Map;
+
+public class SourceCheckEvent implements SourceEvent {
+ private Map<MessageQueue, Tuple2<Long, Long>> assignedMq;
+
+ public Map<MessageQueue, Tuple2<Long, Long>> getAssignedMq() {
+ return assignedMq;
+ }
+
+ public void setAssignedMq(Map<MessageQueue, Tuple2<Long, Long>> assignedMq) {
+ this.assignedMq = assignedMq;
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceDetectEvent.java b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceDetectEvent.java
new file mode 100644
index 0000000..af8c4e3
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceDetectEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rocketmq.common.event;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+public class SourceDetectEvent implements SourceEvent {
+ // Request to resend the initial allocation result
+ private boolean reSendInitAssign = true;
+
+ public boolean getReSendInitAssign() {
+ return reSendInitAssign;
+ }
+
+ public void setReSendInitAssign(boolean reSendInitAssign) {
+ this.reSendInitAssign = reSendInitAssign;
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceInitAssignEvent.java b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceInitAssignEvent.java
new file mode 100644
index 0000000..347ec51
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceInitAssignEvent.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rocketmq.common.event;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
+
+import java.util.List;
+
+public class SourceInitAssignEvent implements SourceEvent {
+ private List<RocketMQSourceSplit> splits;
+
+ public void setSplits(List<RocketMQSourceSplit> splits) {
+ this.splits = splits;
+ }
+
+ public List<RocketMQSourceSplit> getSplits() {
+ return splits;
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceReportOffsetEvent.java b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceReportOffsetEvent.java
new file mode 100644
index 0000000..d850256
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceReportOffsetEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rocketmq.common.event;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+public class SourceReportOffsetEvent implements SourceEvent {
+ private String topic;
+ private String broker;
+ private int queueId;
+ private long checkpoint = -1;
+
+ public void setBroker(String broker) {
+ this.broker = broker;
+ }
+
+ public void setCheckpoint(long checkpoint) {
+ this.checkpoint = checkpoint;
+ }
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public long getCheckpoint() {
+ return checkpoint;
+ }
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public String getBroker() {
+ return broker;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/common/lock/SpinLock.java b/src/main/java/org/apache/flink/connector/rocketmq/common/lock/SpinLock.java
new file mode 100644
index 0000000..dbb32c2
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/rocketmq/common/lock/SpinLock.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rocketmq.common.lock;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpinLock {
+ private AtomicBoolean lock = new AtomicBoolean(false);
+
+ public void lock() {
+ boolean lock = false;
+ do {
+ lock = this.lock.compareAndSet(false, true);
+ } while (!lock);
+ }
+
+ public void unlock() {
+ this.lock.set(false);
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
index bedf97f..91d1d39 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
@@ -537,7 +537,8 @@
}
}
- public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) throws MQClientException {
+ public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues)
+ throws MQClientException {
Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
restoredOffsets.forEach(
(mq, offset) -> {
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
index 317031f..a76a198 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
@@ -17,7 +17,6 @@
package org.apache.flink.connector.rocketmq.source;
-import com.alibaba.fastjson.JSON;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
@@ -26,6 +25,8 @@
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StringUtils;
+
+import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java b/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java
index 25fd8c7..cfdc0a1 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java
@@ -124,13 +124,14 @@
final RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics =
new RocketMQSourceReaderMetrics(readerContext.metricGroup());
- Supplier<SplitReader<MessageView, RocketMQSourceSplit>> splitReaderSupplier =
- () ->
- new RocketMQSplitReader<>(
- configuration,
- readerContext,
- deserializationSchema,
- rocketMQSourceReaderMetrics);
+ // unique reader
+ RocketMQSplitReader<OUT> reader =
+ new RocketMQSplitReader<>(
+ configuration,
+ readerContext,
+ deserializationSchema,
+ rocketMQSourceReaderMetrics);
+ Supplier<SplitReader<MessageView, RocketMQSourceSplit>> splitReaderSupplier = () -> reader;
RocketMQSourceFetcherManager rocketmqSourceFetcherManager =
new RocketMQSourceFetcherManager(
@@ -145,7 +146,8 @@
recordEmitter,
configuration,
readerContext,
- rocketMQSourceReaderMetrics);
+ rocketMQSourceReaderMetrics,
+ splitReaderSupplier);
}
@Override
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
index 805df1b..960e622 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
@@ -18,8 +18,9 @@
package org.apache.flink.connector.rocketmq.source.enumerator;
-import com.alibaba.fastjson.JSON;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
index 77c0c33..c228309 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
@@ -18,16 +18,21 @@
package org.apache.flink.connector.rocketmq.source.enumerator;
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.Sets;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.rocketmq.common.event.SourceCheckEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceDetectEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceInitAssignEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceReportOffsetEvent;
+import org.apache.flink.connector.rocketmq.common.lock.SpinLock;
import org.apache.flink.connector.rocketmq.source.InnerConsumer;
import org.apache.flink.connector.rocketmq.source.InnerConsumerImpl;
import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions;
@@ -36,18 +41,26 @@
import org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
import org.apache.flink.util.FlinkRuntimeException;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.collect.Sets;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -69,18 +82,31 @@
private final OffsetsSelector startingOffsetsSelector;
private final OffsetsSelector stoppingOffsetsSelector;
+ // Used for queue dynamic allocation
+ private final Map<MessageQueue, Long> checkedOffsets;
+ private boolean[] initTask;
+
// The internal states of the enumerator.
// This set is only accessed by the partition discovery callable in the callAsync() method.
// The current assignment by reader id. Only accessed by the coordinator thread.
// The discovered and initialized partition splits that are waiting for owner reader to be
// ready.
- private final Set<MessageQueue> allocatedSet;
+ private final Map<MessageQueue, Byte> allocatedSet;
private final Map<Integer, Set<RocketMQSourceSplit>> pendingSplitAssignmentMap;
+ // Only Maintaining mapping relationship
+ private final Map<Integer, Set<RocketMQSourceSplit>> assignedMap;
+ private final Map<MessageQueue, Integer /* taskId */> reflectedQueueToTaskId;
+
// Param from configuration
private final String groupId;
private final long partitionDiscoveryIntervalMs;
+ // Indicates the number of allocated queues
+ private int partitionId;
+ private final SpinLock lock;
+ private ScheduledExecutorService scheduledExecutorService;
+
public RocketMQSourceEnumerator(
OffsetsSelector startingOffsetsSelector,
OffsetsSelector stoppingOffsetsSelector,
@@ -107,13 +133,19 @@
this.configuration = configuration;
this.context = context;
this.boundedness = boundedness;
+ this.lock = new SpinLock();
// Support allocate splits to reader
+ this.checkedOffsets = new ConcurrentHashMap<>();
+ this.reflectedQueueToTaskId = new ConcurrentHashMap<>();
this.pendingSplitAssignmentMap = new ConcurrentHashMap<>();
- this.allocatedSet = new HashSet<>(currentSplitAssignment);
+ this.allocatedSet = new ConcurrentHashMap<>();
+ this.assignedMap = new ConcurrentHashMap<>();
this.allocateStrategy =
AllocateStrategyFactory.getStrategy(
- configuration, context, new RocketMQSourceEnumState(allocatedSet));
+ configuration,
+ context,
+ new RocketMQSourceEnumState(currentSplitAssignment));
// For rocketmq setting
this.groupId = configuration.getString(RocketMQSourceOptions.CONSUMER_GROUP);
@@ -121,12 +153,23 @@
this.stoppingOffsetsSelector = stoppingOffsetsSelector;
this.partitionDiscoveryIntervalMs =
configuration.getLong(RocketMQSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS);
+ this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+
+ // Initialize the task status
+ log.info(
+ "Starting the RocketMQSourceEnumerator with current split assignment: {}",
+ currentSplitAssignment);
+ if (!currentSplitAssignment.isEmpty()) {
+ this.initTask = new boolean[context.currentParallelism()];
+ }
}
@Override
public void start() {
consumer = new InnerConsumerImpl(configuration);
consumer.start();
+ scheduledExecutorService.scheduleAtFixedRate(
+ this::notifyAssignResult, 30 * 1000, 30 * 1000, TimeUnit.MILLISECONDS);
if (partitionDiscoveryIntervalMs > 0) {
log.info(
@@ -190,7 +233,7 @@
@Override
public RocketMQSourceEnumState snapshotState(long checkpointId) {
- return new RocketMQSourceEnumState(allocatedSet);
+ return new RocketMQSourceEnumState(allocatedSet.keySet());
}
@Override
@@ -205,9 +248,32 @@
}
}
+ @Override
+ public void handleSourceEvent(int taskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof SourceReportOffsetEvent) {
+ handleOffsetEvent(taskId, (SourceReportOffsetEvent) sourceEvent);
+ } else if (sourceEvent instanceof SourceInitAssignEvent) {
+ handleInitAssignEvent(taskId, (SourceInitAssignEvent) sourceEvent);
+ }
+ }
+
// ----------------- private methods -------------------
private Set<MessageQueue> requestServiceDiscovery() {
+ // Ensure all subtasks have been initialized
+ try {
+ if (initTask != null) {
+ for (int i = 0; i < context.currentParallelism(); i++) {
+ if (!initTask[i]) {
+ context.sendEventToSourceReader(i, new SourceDetectEvent());
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("init request resend error, please check task has started");
+ return null;
+ }
+
Set<String> topicSet =
Sets.newHashSet(
configuration
@@ -235,6 +301,9 @@
if (t != null) {
throw new FlinkRuntimeException("Failed to handle source splits change due to ", t);
}
+ if (latestSet == null) {
+ return;
+ }
final SourceChangeResult sourceChangeResult = getSourceChangeResult(latestSet);
if (sourceChangeResult.isEmpty()) {
@@ -248,30 +317,59 @@
// This method should only be invoked in the coordinator executor thread.
private SourceSplitChangeResult initializeSourceSplits(SourceChangeResult sourceChangeResult) {
+ lock.lock();
+
Set<MessageQueue> increaseSet = sourceChangeResult.getIncreaseSet();
+ Set<MessageQueue> decreaseSet = sourceChangeResult.getDecreaseSet();
OffsetsSelector.MessageQueueOffsetsRetriever offsetsRetriever =
new InnerConsumerImpl.RemotingOffsetsRetrieverImpl(consumer);
- Map<MessageQueue, Long> startingOffsets =
+ Map<MessageQueue, Long> increaseStartingOffsets =
startingOffsetsSelector.getMessageQueueOffsets(increaseSet, offsetsRetriever);
- Map<MessageQueue, Long> stoppingOffsets =
+ Map<MessageQueue, Long> increaseStoppingOffsets =
stoppingOffsetsSelector.getMessageQueueOffsets(increaseSet, offsetsRetriever);
+ Map<MessageQueue, Long> decreaseStoppingOffsets =
+ stoppingOffsetsSelector.getMessageQueueOffsets(decreaseSet, offsetsRetriever);
+ Map<MessageQueue, Long> decreaseStartingOffsets =
+ startingOffsetsSelector.getMessageQueueOffsets(decreaseSet, offsetsRetriever);
Set<RocketMQSourceSplit> increaseSplitSet =
increaseSet.stream()
.map(
mq -> {
- long startingOffset = startingOffsets.get(mq);
+ long startingOffset = increaseStartingOffsets.get(mq);
long stoppingOffset =
- stoppingOffsets.getOrDefault(
+ increaseStoppingOffsets.getOrDefault(
mq, RocketMQSourceSplit.NO_STOPPING_OFFSET);
return new RocketMQSourceSplit(
mq, startingOffset, stoppingOffset);
})
.collect(Collectors.toSet());
+ // Update cache
+ increaseSet.forEach(
+ mq ->
+ checkedOffsets.put(
+ mq,
+ increaseStartingOffsets.getOrDefault(
+ mq, RocketMQSourceSplit.NO_STOPPING_OFFSET)));
- return new SourceSplitChangeResult(increaseSplitSet, sourceChangeResult.getDecreaseSet());
+ Set<RocketMQSourceSplit> decreaseSplitSet =
+ decreaseSet.stream()
+ .map(
+ mq -> {
+ long startingOffset = decreaseStartingOffsets.get(mq);
+ long stoppingOffset =
+ decreaseStoppingOffsets.getOrDefault(
+ mq, RocketMQSourceSplit.NO_STOPPING_OFFSET);
+ allocatedSet.remove(mq);
+ checkedOffsets.remove(mq);
+ return new RocketMQSourceSplit(
+ mq, startingOffset, stoppingOffset, false);
+ })
+ .collect(Collectors.toSet());
+
+ return new SourceSplitChangeResult(increaseSplitSet, decreaseSplitSet);
}
/**
@@ -291,15 +389,145 @@
if (partitionDiscoveryIntervalMs <= 0) {
log.info("Split changes, but dynamic partition discovery is disabled.");
}
- this.calculateSplitAssignment(sourceSplitChangeResult);
- this.sendSplitChangesToRemote(context.registeredReaders().keySet());
+ try {
+ this.calculateSplitAssignment(sourceSplitChangeResult);
+ this.sendSplitChangesToRemote(context.registeredReaders().keySet());
+ } finally {
+ lock.unlock();
+ }
}
/** Calculate new split assignment according allocate strategy */
private void calculateSplitAssignment(SourceSplitChangeResult sourceSplitChangeResult) {
- Map<Integer, Set<RocketMQSourceSplit>> newSourceSplitAllocateMap =
- this.allocateStrategy.allocate(
- sourceSplitChangeResult.getIncreaseSet(), context.currentParallelism());
+ Map<Integer, Set<RocketMQSourceSplit>> newSourceSplitAllocateMap;
+
+ // Preliminary calculation of distribution results
+ {
+ // Allocate valid queues
+ if (sourceSplitChangeResult.decreaseSet != null
+ && !sourceSplitChangeResult.decreaseSet.isEmpty()) {
+ partitionId = 0;
+
+ // Re-load balancing
+ Set<RocketMQSourceSplit> allMQ = new HashSet<>();
+ OffsetsSelector.MessageQueueOffsetsRetriever offsetsRetriever =
+ new InnerConsumerImpl.RemotingOffsetsRetrieverImpl(consumer);
+ Map<MessageQueue, Long> stoppingOffsets =
+ stoppingOffsetsSelector.getMessageQueueOffsets(
+ allocatedSet.keySet(), offsetsRetriever);
+ Set<MessageQueue> delete =
+ sourceSplitChangeResult.decreaseSet.stream()
+ .map(RocketMQSourceSplit::getMessageQueue)
+ .collect(Collectors.toSet());
+
+ // Calculate all queue
+ allMQ.addAll(sourceSplitChangeResult.increaseSet);
+ allocatedSet
+ .keySet()
+ .forEach(
+ mq -> {
+ if (!delete.contains(mq)) {
+ allMQ.add(
+ new RocketMQSourceSplit(
+ mq,
+ checkedOffsets.get(mq),
+ stoppingOffsets.getOrDefault(
+ mq,
+ RocketMQSourceSplit
+ .NO_STOPPING_OFFSET)));
+ }
+ });
+ newSourceSplitAllocateMap =
+ this.allocateStrategy.allocate(
+ allMQ, context.currentParallelism(), partitionId);
+
+ // Update cache
+ assignedMap.clear();
+ for (Map.Entry entry : newSourceSplitAllocateMap.entrySet()) {
+ assignedMap.put(
+ (Integer) entry.getKey(),
+ ((Set<RocketMQSourceSplit>) entry.getValue())
+ .stream()
+ .map(RocketMQSourceSplit::clone)
+ .collect(Collectors.toSet()));
+ }
+ partitionId = allMQ.size();
+ } else {
+ newSourceSplitAllocateMap =
+ this.allocateStrategy.allocate(
+ sourceSplitChangeResult.getIncreaseSet(),
+ context.currentParallelism(),
+ partitionId);
+
+ // Update cache
+ newSourceSplitAllocateMap.forEach(
+ (k, v) ->
+ v.forEach(
+ mq ->
+ assignedMap
+ .computeIfAbsent(k, r -> new HashSet<>())
+ .add(mq)));
+ partitionId += sourceSplitChangeResult.getIncreaseSet().size();
+ }
+
+ // Allocate deleted queues
+ if (sourceSplitChangeResult.decreaseSet != null
+ && !sourceSplitChangeResult.decreaseSet.isEmpty()) {
+ sourceSplitChangeResult.decreaseSet.forEach(
+ mq -> {
+ newSourceSplitAllocateMap
+ .computeIfAbsent(
+ reflectedQueueToTaskId.get(mq.getMessageQueue()),
+ k -> new HashSet<>())
+ .add(mq);
+ reflectedQueueToTaskId.remove(mq.getMessageQueue());
+ });
+ }
+ }
+
+ {
+ // Calculate the result after queue migration
+ if (sourceSplitChangeResult.decreaseSet != null
+ && !sourceSplitChangeResult.decreaseSet.isEmpty()) {
+ Map<Integer, Set<RocketMQSourceSplit>> migrationQueue = new HashMap<>();
+ Map<Integer, Set<RocketMQSourceSplit>> noMigrationQueue = new HashMap<>();
+ for (Map.Entry entry : newSourceSplitAllocateMap.entrySet()) {
+ int taskId = (int) entry.getKey();
+ Set<RocketMQSourceSplit> splits = (Set<RocketMQSourceSplit>) entry.getValue();
+ for (RocketMQSourceSplit split : splits) {
+ if (!split.getIsIncrease()) {
+ continue;
+ }
+ if (taskId != reflectedQueueToTaskId.get(split.getMessageQueue())) {
+ migrationQueue
+ .computeIfAbsent(
+ reflectedQueueToTaskId.get(split.getMessageQueue()),
+ k -> new HashSet<>())
+ .add(
+ new RocketMQSourceSplit(
+ split.getMessageQueue(),
+ split.getStartingOffset(),
+ split.getStoppingOffset(),
+ false));
+ } else {
+ noMigrationQueue
+ .computeIfAbsent(taskId, k -> new HashSet<>())
+ .add(split);
+ }
+ }
+ }
+
+ // finally result
+ migrationQueue.forEach(
+ (taskId, splits) -> {
+ newSourceSplitAllocateMap.get(taskId).addAll(splits);
+ });
+ noMigrationQueue.forEach(
+ (taskId, splits) -> {
+ newSourceSplitAllocateMap.get(taskId).removeAll(splits);
+ });
+ }
+ }
for (Map.Entry<Integer, Set<RocketMQSourceSplit>> entry :
newSourceSplitAllocateMap.entrySet()) {
@@ -330,7 +558,12 @@
.computeIfAbsent(pendingReader, k -> new ArrayList<>())
.addAll(pendingAssignmentForReader);
pendingAssignmentForReader.forEach(
- split -> this.allocatedSet.add(split.getMessageQueue()));
+ split -> {
+ if (split.getIsIncrease()) {
+ this.allocatedSet.put(split.getMessageQueue(), (byte) 1);
+ reflectedQueueToTaskId.put(split.getMessageQueue(), pendingReader);
+ }
+ });
}
}
@@ -352,6 +585,86 @@
}
}
+ private void handleInitAssignEvent(int taskId, SourceInitAssignEvent initAssignEvent) {
+ if (this.initTask == null || this.initTask[taskId]) {
+ return;
+ }
+ lock.lock();
+ try {
+ // sync assign result
+ if (initAssignEvent.getSplits() != null && !initAssignEvent.getSplits().isEmpty()) {
+ log.info(
+ "Received SourceInitAssignEvent from reader {} with {} splits.",
+ taskId,
+ initAssignEvent.getSplits().toString());
+ initAssignEvent
+ .getSplits()
+ .forEach(
+ split -> {
+ this.assignedMap
+ .computeIfAbsent(taskId, r -> new HashSet<>())
+ .add(split);
+ this.checkedOffsets.put(
+ split.getMessageQueue(), split.getStoppingOffset());
+ this.reflectedQueueToTaskId.put(
+ split.getMessageQueue(), taskId);
+ this.allocatedSet.put(split.getMessageQueue(), (byte) 1);
+ });
+ }
+ this.initTask[taskId] = true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void handleOffsetEvent(int taskId, SourceReportOffsetEvent sourceReportOffsetEvent) {
+ lock.lock();
+ try {
+ // Update offset of message queue
+ if (sourceReportOffsetEvent != null && sourceReportOffsetEvent.getCheckpoint() != -1) {
+ log.info(
+ "Received SourceReportOffsetEvent from reader {} with offset {}",
+ taskId,
+ sourceReportOffsetEvent.getCheckpoint());
+ MessageQueue mq =
+ new MessageQueue(
+ sourceReportOffsetEvent.getTopic(),
+ sourceReportOffsetEvent.getBroker(),
+ sourceReportOffsetEvent.getQueueId());
+ this.checkedOffsets.put(mq, sourceReportOffsetEvent.getCheckpoint());
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void notifyAssignResult() {
+ if (assignedMap.isEmpty()) {
+ return;
+ }
+ lock.lock();
+ try {
+ for (Map.Entry<Integer, Set<RocketMQSourceSplit>> entry : assignedMap.entrySet()) {
+ SourceCheckEvent sourceCheckEvent = new SourceCheckEvent();
+ Map<MessageQueue, Tuple2<Long, Long>> assignedMq = new HashMap<>();
+
+ entry.getValue()
+ .forEach(
+ split -> {
+ assignedMq.put(
+ split.getMessageQueue(),
+ new Tuple2<>(
+ split.getStartingOffset(),
+ split.getStoppingOffset()));
+ });
+ sourceCheckEvent.setAssignedMq(assignedMq);
+ context.sendEventToSourceReader(entry.getKey(), sourceCheckEvent);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
/** A container class to hold the newly added partitions and removed partitions. */
@VisibleForTesting
private static class SourceChangeResult {
@@ -380,7 +693,7 @@
public static class SourceSplitChangeResult {
private final Set<RocketMQSourceSplit> increaseSet;
- private final Set<MessageQueue> decreaseSet;
+ private final Set<RocketMQSourceSplit> decreaseSet;
private SourceSplitChangeResult(Set<RocketMQSourceSplit> increaseSet) {
this.increaseSet = Collections.unmodifiableSet(increaseSet);
@@ -388,7 +701,7 @@
}
private SourceSplitChangeResult(
- Set<RocketMQSourceSplit> increaseSet, Set<MessageQueue> decreaseSet) {
+ Set<RocketMQSourceSplit> increaseSet, Set<RocketMQSourceSplit> decreaseSet) {
this.increaseSet = Collections.unmodifiableSet(increaseSet);
this.decreaseSet = Collections.unmodifiableSet(decreaseSet);
}
@@ -397,14 +710,14 @@
return increaseSet;
}
- public Set<MessageQueue> getDecreaseSet() {
+ public Set<RocketMQSourceSplit> getDecreaseSet() {
return decreaseSet;
}
}
@VisibleForTesting
private SourceChangeResult getSourceChangeResult(Set<MessageQueue> latestSet) {
- Set<MessageQueue> currentSet = Collections.unmodifiableSet(this.allocatedSet);
+ Set<MessageQueue> currentSet = Collections.unmodifiableSet(this.allocatedSet.keySet());
Set<MessageQueue> increaseSet = Sets.difference(latestSet, currentSet);
Set<MessageQueue> decreaseSet = Sets.difference(currentSet, latestSet);
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java
index bfd814e..6386b22 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java
@@ -44,4 +44,17 @@
*/
Map<Integer, Set<RocketMQSourceSplit>> allocate(
final Collection<RocketMQSourceSplit> mqAll, final int parallelism);
+
+ /**
+ * Allocates RocketMQ source splits to Flink tasks based on the selected allocation strategy.
+ *
+ * @param mqAll a collection of all available RocketMQ source splits
+ * @param parallelism the desired parallelism for the Flink tasks
+ * @param globalAssignedNumber number of allocated queues
+ * @return a map of task indices to sets of corresponding RocketMQ source splits
+ */
+ Map<Integer, Set<RocketMQSourceSplit>> allocate(
+ final Collection<RocketMQSourceSplit> mqAll,
+ final int parallelism,
+ int globalAssignedNumber);
}
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java
index 6c9d723..442afb9 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java
@@ -27,6 +27,7 @@
public static final String STRATEGY_NAME_BROADCAST = "broadcast";
public static final String STRATEGY_NAME_CONSISTENT_HASH = "hash";
+ public static final String STRATEGY_NAME_AVERAGE = "average";
private AllocateStrategyFactory() {
// No public constructor.
@@ -46,6 +47,8 @@
return new ConsistentHashAllocateStrategy();
case STRATEGY_NAME_BROADCAST:
return new BroadcastAllocateStrategy();
+ case STRATEGY_NAME_AVERAGE:
+ return new AverageAllocateStrategy();
default:
throw new IllegalArgumentException(
"We don't support this allocate strategy: " + allocateStrategyName);
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategy.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategy.java
new file mode 100644
index 0000000..f808133
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategy.java
@@ -0,0 +1,43 @@
+package org.apache.flink.connector.rocketmq.source.enumerator.allocate;
+
+import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class AverageAllocateStrategy implements AllocateStrategy {
+ @Override
+ public String getStrategyName() {
+ return AllocateStrategyFactory.STRATEGY_NAME_AVERAGE;
+ }
+
+ @Override
+ public Map<Integer, Set<RocketMQSourceSplit>> allocate(
+ Collection<RocketMQSourceSplit> mqAll, int parallelism) {
+ return null;
+ }
+
+ @Override
+ public Map<Integer, Set<RocketMQSourceSplit>> allocate(
+ Collection<RocketMQSourceSplit> mqAll, int parallelism, int globalAssignedNumber) {
+ Map<Integer, Set<RocketMQSourceSplit>> result = new HashMap<>();
+ for (RocketMQSourceSplit mq : mqAll) {
+ int readerIndex =
+ this.getSplitOwner(mq.getTopic(), globalAssignedNumber++, parallelism);
+ result.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(mq);
+ }
+ return result;
+ }
+
+ private int getSplitOwner(String topic, int partition, int numReaders) {
+ int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % numReaders;
+
+ // here, the assumption is that the id of RocketMQ partitions are always ascending
+ // starting from 0, and therefore can be used directly as the offset clockwise from the
+ // start index
+ return (startIndex + partition) % numReaders;
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java
index 2e46419..f37cf04 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java
@@ -40,4 +40,10 @@
}
return result;
}
+
+ @Override
+ public Map<Integer, Set<RocketMQSourceSplit>> allocate(
+ Collection<RocketMQSourceSplit> mqAll, int parallelism, int globalAssignedNumber) {
+ return allocate(mqAll, parallelism);
+ }
}
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java
index 6a3ad2c..7cd10c4 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java
@@ -48,4 +48,10 @@
}
return result;
}
+
+ @Override
+ public Map<Integer, Set<RocketMQSourceSplit>> allocate(
+ Collection<RocketMQSourceSplit> mqAll, int parallelism, int globalAssignedNumber) {
+ return allocate(mqAll, parallelism);
+ }
}
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java b/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java
index 09c1049..57c8a63 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java
@@ -45,4 +45,6 @@
public RocketMQSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) {}
public void registerNewMessageQueue(MessageQueue messageQueue) {}
+
+ public void unregisterMessageQueue(MessageQueue messageQueue) {}
}
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java
index a635b17..242e19e 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java
@@ -95,4 +95,8 @@
public void wakeUp() {}
});
}
+
+ public RocketMQSplitReader getSplitReader() {
+ return (RocketMQSplitReader) fetchers.get(0).getSplitReader();
+ }
}
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java
index 9303f35..042c12a 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java
@@ -19,30 +19,43 @@
package org.apache.flink.connector.rocketmq.source.reader;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.rocketmq.common.event.SourceCheckEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceDetectEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceInitAssignEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceReportOffsetEvent;
import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions;
import org.apache.flink.connector.rocketmq.source.metrics.RocketMQSourceReaderMetrics;
import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplitState;
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
+import com.google.common.collect.Sets;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
/** The source reader for RocketMQ partitions. */
public class RocketMQSourceReader<T>
@@ -57,6 +70,7 @@
private final SortedMap<Long, Map<MessageQueue, Long>> offsetsToCommit;
private final ConcurrentMap<MessageQueue, Long> offsetsOfFinishedSplits;
private final RocketMQSourceReaderMetrics rocketmqSourceReaderMetrics;
+ private final RocketMQSplitReader reader;
public RocketMQSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<MessageView>> elementsQueue,
@@ -64,7 +78,8 @@
RecordEmitter<MessageView, T, RocketMQSourceSplitState> recordEmitter,
Configuration config,
SourceReaderContext context,
- RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics) {
+ RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics,
+ Supplier<SplitReader<MessageView, RocketMQSourceSplit>> readerSupplier) {
super(elementsQueue, rocketmqSourceFetcherManager, recordEmitter, config, context);
this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
@@ -72,6 +87,7 @@
this.commitOffsetsOnCheckpoint =
config.get(RocketMQSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
this.rocketmqSourceReaderMetrics = rocketMQSourceReaderMetrics;
+ this.reader = (RocketMQSplitReader) readerSupplier.get();
}
@Override
@@ -136,10 +152,91 @@
@Override
protected RocketMQSourceSplit toSplitType(String splitId, RocketMQSourceSplitState splitState) {
+ // Report checkpoint progress.
+ SourceReportOffsetEvent sourceEvent = new SourceReportOffsetEvent();
+ sourceEvent.setBroker(splitState.getBrokerName());
+ sourceEvent.setTopic(splitState.getTopic());
+ sourceEvent.setQueueId(splitState.getQueueId());
+ sourceEvent.setCheckpoint(splitState.getCurrentOffset());
+ context.sendSourceEventToCoordinator(sourceEvent);
+ LOG.info("Report checkpoint progress: {}", sourceEvent);
return splitState.getSourceSplit();
}
+ @Override
+ public void handleSourceEvents(SourceEvent sourceEvent) {
+ if (sourceEvent instanceof SourceDetectEvent) {
+ handleSourceDetectEvent();
+ } else if (sourceEvent instanceof SourceCheckEvent) {
+ handleSourceCheckEvent((SourceCheckEvent) sourceEvent);
+ }
+ }
+
// ------------------------
+ private void handleSourceDetectEvent() {
+ SourceInitAssignEvent sourceEvent1 = new SourceInitAssignEvent();
+ List<RocketMQSourceSplit> splits = new LinkedList<>();
+ ConcurrentMap<MessageQueue, Tuple2<Long, Long>> currentOffsetTable =
+ reader.getCurrentOffsetTable();
+
+ if (!currentOffsetTable.isEmpty()) {
+ for (Map.Entry<MessageQueue, Tuple2<Long, Long>> entry :
+ currentOffsetTable.entrySet()) {
+ MessageQueue messageQueue = entry.getKey();
+ Long startOffset = entry.getValue().f0;
+ Long stopOffset = entry.getValue().f1;
+ RocketMQSourceSplit split =
+ new RocketMQSourceSplit(messageQueue, startOffset, stopOffset);
+ splits.add(split);
+ }
+ }
+ sourceEvent1.setSplits(splits);
+ context.sendSourceEventToCoordinator(sourceEvent1);
+ reader.setInitFinish(true);
+ }
+
+ private void handleSourceCheckEvent(SourceCheckEvent sourceEvent) {
+ Map<MessageQueue, Tuple2<Long, Long>> checkMap = sourceEvent.getAssignedMq();
+ Set<MessageQueue> assignedMq = checkMap.keySet();
+ Set<MessageQueue> currentMq = reader.getCurrentOffsetTable().keySet();
+ Set<MessageQueue> increaseSet = Sets.difference(assignedMq, currentMq);
+ Set<MessageQueue> decreaseSet = Sets.difference(currentMq, assignedMq);
+
+ if (increaseSet.isEmpty() && decreaseSet.isEmpty()) {
+ LOG.info("No need to checkpoint, current assigned mq is same as before.");
+ }
+
+ if (!increaseSet.isEmpty()) {
+ SplitsAddition<RocketMQSourceSplit> increase;
+ increase =
+ new SplitsAddition<>(
+ increaseSet.stream()
+ .map(
+ mq ->
+ new RocketMQSourceSplit(
+ mq,
+ checkMap.get(mq).f0,
+ checkMap.get(mq).f1,
+ true))
+ .collect(Collectors.toList()));
+ reader.handleSplitsChanges(increase);
+ }
+ if (!decreaseSet.isEmpty()) {
+ SplitsAddition<RocketMQSourceSplit> decrease;
+ decrease =
+ new SplitsAddition<>(
+ decreaseSet.stream()
+ .map(
+ mq ->
+ new RocketMQSourceSplit(
+ mq,
+ checkMap.get(mq).f0,
+ checkMap.get(mq).f1,
+ false))
+ .collect(Collectors.toList()));
+ reader.handleSplitsChanges(decrease);
+ }
+ }
@VisibleForTesting
SortedMap<Long, Map<MessageQueue, Long>> getOffsetsToCommit() {
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
index 3a07966..9a0833c 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
@@ -27,6 +27,8 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.rocketmq.common.config.RocketMQOptions;
+import org.apache.flink.connector.rocketmq.common.event.SourceInitAssignEvent;
+import org.apache.flink.connector.rocketmq.common.lock.SpinLock;
import org.apache.flink.connector.rocketmq.source.InnerConsumer;
import org.apache.flink.connector.rocketmq.source.InnerConsumerImpl;
import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions;
@@ -36,11 +38,13 @@
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
+
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
@@ -81,6 +85,11 @@
private final ConcurrentMap<MessageQueue, Tuple2<Long, Long>> currentOffsetTable;
private final RocketMQSourceReaderMetrics rocketmqSourceReaderMetrics;
+ // Init status : true-finish init; false-not finish init
+ private boolean initFinish;
+ private final RocketMQRecordsWithSplitIds<MessageView> recordsWithSplitIds;
+ private final SpinLock lock;
+
public RocketMQSplitReader(
Configuration configuration,
SourceReaderContext sourceReaderContext,
@@ -92,6 +101,9 @@
this.deserializationSchema = deserializationSchema;
this.offsetsToCommit = new TreeMap<>();
this.currentOffsetTable = new ConcurrentHashMap<>();
+ this.recordsWithSplitIds = new RocketMQRecordsWithSplitIds<>(rocketmqSourceReaderMetrics);
+ this.initFinish = false;
+ this.lock = new SpinLock();
this.consumer = new InnerConsumerImpl(configuration);
this.consumer.start();
@@ -103,10 +115,18 @@
@Override
public RecordsWithSplitIds<MessageView> fetch() throws IOException {
+ lock.lock();
wakeup = false;
RocketMQRecordsWithSplitIds<MessageView> recordsWithSplitIds =
new RocketMQRecordsWithSplitIds<>(rocketmqSourceReaderMetrics);
try {
+ this.recordsWithSplitIds.finishedSplits.forEach(
+ splitId -> recordsWithSplitIds.addFinishedSplit(splitId));
+ this.recordsWithSplitIds.finishedSplits.clear();
+ } finally {
+ lock.unlock();
+ }
+ try {
Duration duration =
Duration.ofMillis(this.configuration.getLong(RocketMQOptions.POLL_TIMEOUT));
List<MessageView> messageExtList = consumer.poll(duration);
@@ -129,6 +149,7 @@
} catch (Exception e) {
LOG.error("Reader fetch split error", e);
}
+
return recordsWithSplitIds;
}
@@ -142,38 +163,53 @@
splitsChange.getClass()));
}
+ if (!initFinish) {
+ LOG.info("Start to init reader");
+ SourceInitAssignEvent sourceEvent = new SourceInitAssignEvent();
+ sourceEvent.setSplits(splitsChange.splits());
+ sourceReaderContext.sendSourceEventToCoordinator(sourceEvent);
+ initFinish = true;
+ }
+ lock.lock();
+
+ LOG.info("Receive split change: " + splitsChange.splits().toString());
// Assignment.
ConcurrentMap<MessageQueue, Tuple2<Long, Long>> newOffsetTable = new ConcurrentHashMap<>();
- // Set up the stopping timestamps.
- splitsChange
- .splits()
- .forEach(
- split -> {
- MessageQueue messageQueue =
- new MessageQueue(
- split.getTopic(),
- split.getBrokerName(),
- split.getQueueId());
- newOffsetTable.put(
- messageQueue,
- new Tuple2<>(
- split.getStartingOffset(), split.getStoppingOffset()));
- rocketmqSourceReaderMetrics.registerNewMessageQueue(messageQueue);
- });
-
- // todo: log message queue change
-
+ try {
+ // Set up the stopping timestamps.
+ splitsChange
+ .splits()
+ .forEach(
+ split -> {
+ if (!split.getIsIncrease()) {
+ finishSplitAtRecord(
+ split.getMessageQueue(),
+ split.getStoppingOffset(),
+ recordsWithSplitIds);
+ } else {
+ if (!currentOffsetTable.containsKey(split.getMessageQueue())) {
+ registerSplits(split);
+ newOffsetTable.put(
+ split.getMessageQueue(),
+ new Tuple2<>(
+ split.getStartingOffset(),
+ split.getStoppingOffset()));
+ }
+ }
+ });
+ } finally {
+ lock.unlock();
+ }
// It will replace the previous assignment
- Set<MessageQueue> incrementalSplits = newOffsetTable.keySet();
- consumer.assign(incrementalSplits);
+ consumer.assign(currentOffsetTable.keySet());
// set offset to consumer
for (Map.Entry<MessageQueue, Tuple2<Long, Long>> entry : newOffsetTable.entrySet()) {
MessageQueue messageQueue = entry.getKey();
Long startingOffset = entry.getValue().f0;
try {
- consumer.seek(messageQueue, startingOffset);
+ consumer.seek(messageQueue, startingOffset == -1L ? 0L : startingOffset);
} catch (Exception e) {
String info =
String.format(
@@ -207,14 +243,31 @@
}
}
+ public ConcurrentMap<MessageQueue, Tuple2<Long, Long>> getCurrentOffsetTable() {
+ return currentOffsetTable;
+ }
+
+ private void registerSplits(RocketMQSourceSplit split) {
+ LOG.info("Register split {}", split.splitId());
+ this.currentOffsetTable.put(
+ split.getMessageQueue(),
+ new Tuple2<>(split.getStartingOffset(), split.getStoppingOffset()));
+ this.rocketmqSourceReaderMetrics.registerNewMessageQueue(split.getMessageQueue());
+ }
+
+ public void setInitFinish(boolean initFinish) {
+ this.initFinish = initFinish;
+ }
+
private void finishSplitAtRecord(
MessageQueue messageQueue,
long currentOffset,
RocketMQRecordsWithSplitIds<MessageView> recordsBySplits) {
-
LOG.info("message queue {} has reached stopping offset {}", messageQueue, currentOffset);
- // recordsBySplits.addFinishedSplit(getSplitId(messageQueue));
+
this.currentOffsetTable.remove(messageQueue);
+ this.rocketmqSourceReaderMetrics.unregisterMessageQueue(messageQueue);
+ recordsBySplits.addFinishedSplit(RocketMQSourceSplit.toSplitId(messageQueue));
}
// ---------------- private helper class ------------------------
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java
index 36c7a0f..eb37e19 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java
@@ -47,6 +47,7 @@
out.writeInt(split.getQueueId());
out.writeLong(split.getStartingOffset());
out.writeLong(split.getStoppingOffset());
+ out.writeBoolean(split.getIsIncrease());
out.flush();
return byteArrayOutputStream.toByteArray();
}
@@ -61,8 +62,13 @@
int partition = in.readInt();
long startingOffset = in.readLong();
long stoppingOffset = in.readLong();
+ if (version == SNAPSHOT_VERSION) {
+ return new RocketMQSourceSplit(
+ topic, broker, partition, startingOffset, stoppingOffset);
+ }
+ boolean isIncrease = in.readBoolean();
return new RocketMQSourceSplit(
- topic, broker, partition, startingOffset, stoppingOffset);
+ topic, broker, partition, startingOffset, stoppingOffset, isIncrease);
}
}
}
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java
index 7124086..a94044d 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java
@@ -37,6 +37,8 @@
private final int queueId;
private final long startingOffset;
private final long stoppingOffset;
+ // whether the split is increase or decrease: true-increase, false-decrease
+ private final boolean isIncrease;
public RocketMQSourceSplit(
MessageQueue messageQueue, long startingOffset, long stoppingOffset) {
@@ -49,6 +51,20 @@
}
public RocketMQSourceSplit(
+ MessageQueue messageQueue,
+ long startingOffset,
+ long stoppingOffset,
+ boolean isIncrease) {
+ this(
+ messageQueue.getTopic(),
+ messageQueue.getBrokerName(),
+ messageQueue.getQueueId(),
+ startingOffset,
+ stoppingOffset,
+ isIncrease);
+ }
+
+ public RocketMQSourceSplit(
String topic,
String brokerName,
int queueId,
@@ -59,6 +75,22 @@
this.queueId = queueId;
this.startingOffset = startingOffset;
this.stoppingOffset = stoppingOffset;
+ this.isIncrease = true;
+ }
+
+ public RocketMQSourceSplit(
+ String topic,
+ String brokerName,
+ int queueId,
+ long startingOffset,
+ long stoppingOffset,
+ boolean isIncrease) {
+ this.topic = topic;
+ this.brokerName = brokerName;
+ this.queueId = queueId;
+ this.startingOffset = startingOffset;
+ this.stoppingOffset = stoppingOffset;
+ this.isIncrease = isIncrease;
}
public String getTopic() {
@@ -85,6 +117,28 @@
return new MessageQueue(topic, brokerName, queueId);
}
+ public boolean getIsIncrease() {
+ return isIncrease;
+ }
+
+ public static String toSplitId(MessageQueue messageQueue) {
+ return messageQueue.getTopic()
+ + SEPARATOR
+ + messageQueue.getBrokerName()
+ + SEPARATOR
+ + messageQueue.getQueueId();
+ }
+
+ public static RocketMQSourceSplit clone(RocketMQSourceSplit split) {
+ return new RocketMQSourceSplit(
+ split.topic,
+ split.brokerName,
+ split.queueId,
+ split.startingOffset,
+ split.stoppingOffset,
+ split.isIncrease);
+ }
+
@Override
public String splitId() {
return topic + SEPARATOR + brokerName + SEPARATOR + queueId;
@@ -93,13 +147,13 @@
@Override
public String toString() {
return String.format(
- "(Topic: %s, BrokerName: %s, QueueId: %d, MinOffset: %d, MaxOffset: %d)",
- topic, brokerName, queueId, startingOffset, stoppingOffset);
+ "(Topic: %s, BrokerName: %s, QueueId: %d, MinOffset: %d, MaxOffset: %d, status: %s)",
+ topic, brokerName, queueId, startingOffset, stoppingOffset, isIncrease);
}
@Override
public int hashCode() {
- return Objects.hash(topic, brokerName, queueId, startingOffset, stoppingOffset);
+ return Objects.hash(topic, brokerName, queueId, startingOffset, stoppingOffset, isIncrease);
}
@Override
@@ -112,6 +166,7 @@
&& brokerName.equals(other.brokerName)
&& queueId == other.queueId
&& startingOffset == other.startingOffset
- && stoppingOffset == other.stoppingOffset;
+ && stoppingOffset == other.stoppingOffset
+ && isIncrease == other.isIncrease;
}
}
diff --git a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java
index ca74d58..5f16a80 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java
@@ -48,6 +48,11 @@
*/
public RocketMQSourceSplit getSourceSplit() {
return new RocketMQSourceSplit(
- getTopic(), getBrokerName(), getQueueId(), getCurrentOffset(), getStoppingOffset());
+ getTopic(),
+ getBrokerName(),
+ getQueueId(),
+ getCurrentOffset(),
+ getStoppingOffset(),
+ getIsIncrease());
}
}
diff --git a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
index 08371b3..78ffd17 100644
--- a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
+++ b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -26,7 +26,6 @@
import org.apache.flink.connector.rocketmq.legacy.common.util.TestUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Assert;
import org.junit.Test;
diff --git a/src/test/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategyTest.java b/src/test/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategyTest.java
new file mode 100644
index 0000000..f63c8d1
--- /dev/null
+++ b/src/test/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategyTest.java
@@ -0,0 +1,97 @@
+package org.apache.flink.connector.rocketmq.source.enumerator.allocate;
+
+import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class AverageAllocateStrategyTest {
+
+ private static final String BROKER_NAME = "brokerName";
+ private static final String PREFIX_TOPIC = "test-topic-";
+ private static final int NUM_SPLITS = 900;
+ private static final int[] SPLIT_SIZE = {1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000};
+
+ @Test
+ public void averageAllocateStrategyTest() {
+ AllocateStrategy allocateStrategy = new AverageAllocateStrategy();
+ Collection<RocketMQSourceSplit> mqAll = new ArrayList<>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ mqAll.add(
+ new RocketMQSourceSplit(
+ PREFIX_TOPIC + (i + 1),
+ BROKER_NAME,
+ i,
+ 0,
+ SPLIT_SIZE[i % SPLIT_SIZE.length]));
+ }
+ int parallelism = 3;
+ Map<Integer, Set<RocketMQSourceSplit>> result =
+ allocateStrategy.allocate(mqAll, parallelism, 0);
+ assertEquals(NUM_SPLITS / parallelism, result.get(0).size());
+ assertEquals(NUM_SPLITS / parallelism, result.get(1).size());
+ assertEquals(NUM_SPLITS / parallelism, result.get(2).size());
+ }
+
+ @Test
+ public void averagesAllocateStrategyTest() {
+ AllocateStrategy allocateStrategy = new AverageAllocateStrategy();
+ Collection<RocketMQSourceSplit> mqAll = new ArrayList<>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ mqAll.add(
+ new RocketMQSourceSplit(
+ PREFIX_TOPIC + (i + 1),
+ BROKER_NAME,
+ i,
+ 0,
+ SPLIT_SIZE[i % SPLIT_SIZE.length]));
+ }
+ int parallelism = 3;
+ Map<Integer, Set<RocketMQSourceSplit>> result =
+ allocateStrategy.allocate(mqAll, parallelism, 0);
+ assertEquals(NUM_SPLITS / parallelism, result.get(0).size());
+ assertEquals(NUM_SPLITS / parallelism, result.get(1).size());
+ assertEquals(NUM_SPLITS / parallelism, result.get(2).size());
+
+ mqAll.clear();
+ for (int i = NUM_SPLITS; i < 8 + NUM_SPLITS; i++) {
+ mqAll.add(
+ new RocketMQSourceSplit(
+ PREFIX_TOPIC + (i + 1),
+ BROKER_NAME,
+ i,
+ 0,
+ SPLIT_SIZE[i % SPLIT_SIZE.length]));
+ }
+ Map<Integer, Set<RocketMQSourceSplit>> result1 =
+ allocateStrategy.allocate(mqAll, parallelism, NUM_SPLITS);
+
+ mqAll.clear();
+ for (int i = 8 + NUM_SPLITS; i < 8 + 7 + NUM_SPLITS; i++) {
+ mqAll.add(
+ new RocketMQSourceSplit(
+ PREFIX_TOPIC + (i + 1),
+ BROKER_NAME,
+ i,
+ 0,
+ SPLIT_SIZE[i % SPLIT_SIZE.length]));
+ }
+ Map<Integer, Set<RocketMQSourceSplit>> result2 =
+ allocateStrategy.allocate(mqAll, parallelism, NUM_SPLITS + 8);
+
+ result1.forEach((k, v) -> result.computeIfAbsent(k, r -> new HashSet<>()).addAll(v));
+ result2.forEach((k, v) -> result.computeIfAbsent(k, r -> new HashSet<>()).addAll(v));
+
+ // No matter how many times it's assigned, it's always equal
+ assertEquals((NUM_SPLITS + 8 + 7) / parallelism, result.get(0).size());
+ assertEquals((NUM_SPLITS + 8 + 7) / parallelism, result.get(1).size());
+ assertEquals((NUM_SPLITS + 8 + 7) / parallelism, result.get(2).size());
+ }
+}
diff --git a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java b/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
index f1fa5af..432994e 100644
--- a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
+++ b/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
@@ -36,4 +36,56 @@
serializer.deserialize(serializer.getVersion(), serializer.serialize(expected));
assertEquals(expected, actual);
}
+
+ @Test
+ public void testSerializeAndDeserialize() throws IOException {
+ RocketMQPartitionSplitSerializer serializer = new RocketMQPartitionSplitSerializer();
+ RocketMQSourceSplit originalSplit =
+ new RocketMQSourceSplit("testTopic", "testBroker", 0, 100L, 200L, false);
+
+ byte[] serialized = serializer.serialize(originalSplit);
+ RocketMQSourceSplit deserializedSplit =
+ serializer.deserialize(serializer.getVersion(), serialized);
+
+ assertEquals(originalSplit.getTopic(), deserializedSplit.getTopic());
+ assertEquals(originalSplit.getBrokerName(), deserializedSplit.getBrokerName());
+ assertEquals(originalSplit.getQueueId(), deserializedSplit.getQueueId());
+ assertEquals(originalSplit.getStartingOffset(), deserializedSplit.getStartingOffset());
+ assertEquals(originalSplit.getStoppingOffset(), deserializedSplit.getStoppingOffset());
+ assertEquals(originalSplit.getIsIncrease(), deserializedSplit.getIsIncrease());
+ }
+
+ @Test
+ public void testDeserializeWithOldVersion() throws IOException {
+ RocketMQPartitionSplitSerializer serializer = new RocketMQPartitionSplitSerializer();
+ RocketMQSourceSplit originalSplit =
+ new RocketMQSourceSplit("testTopic", "testBroker", 0, 100L, 200L, false);
+
+ byte[] serialized = serializer.serialize(originalSplit);
+ RocketMQSourceSplit deserializedSplit = serializer.deserialize(1, serialized);
+
+ assertEquals(originalSplit.getTopic(), deserializedSplit.getTopic());
+ assertEquals(originalSplit.getBrokerName(), deserializedSplit.getBrokerName());
+ assertEquals(originalSplit.getQueueId(), deserializedSplit.getQueueId());
+ assertEquals(originalSplit.getStartingOffset(), deserializedSplit.getStartingOffset());
+ assertEquals(originalSplit.getStoppingOffset(), deserializedSplit.getStoppingOffset());
+ assertEquals(originalSplit.getIsIncrease(), deserializedSplit.getIsIncrease());
+ }
+
+ @Test
+ public void testDeserializeWithOldVersion1() throws IOException {
+ RocketMQPartitionSplitSerializer serializer = new RocketMQPartitionSplitSerializer();
+ RocketMQSourceSplit originalSplit =
+ new RocketMQSourceSplit("testTopic", "testBroker", 0, 100L, 200L, false);
+
+ byte[] serialized = serializer.serialize(originalSplit);
+ RocketMQSourceSplit deserializedSplit = serializer.deserialize(0, serialized);
+
+ assertEquals(originalSplit.getTopic(), deserializedSplit.getTopic());
+ assertEquals(originalSplit.getBrokerName(), deserializedSplit.getBrokerName());
+ assertEquals(originalSplit.getQueueId(), deserializedSplit.getQueueId());
+ assertEquals(originalSplit.getStartingOffset(), deserializedSplit.getStartingOffset());
+ assertEquals(originalSplit.getStoppingOffset(), deserializedSplit.getStoppingOffset());
+ assertEquals(true, deserializedSplit.getIsIncrease());
+ }
}