blob: 38aa132a6aeb35400622a3c82a820800edb3348d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.source.enumerator;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_EARLIEST;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP;
/** The enumerator class for RocketMQ source. */
@Internal
public class RocketMQSourceEnumerator
implements SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> {
private static final Logger LOG = LoggerFactory.getLogger(RocketMQSourceEnumerator.class);
private final Map<MessageQueue, Long> offsetTable = new HashMap<>();
private final String consumerOffsetMode;
private final long consumerOffsetTimestamp;
/** The topic used for this RocketMQSource. */
private final String topic;
/** The consumer group used for this RocketMQSource. */
private final String consumerGroup;
/** The name server address used for this RocketMQSource. */
private final String nameServerAddress;
/** The stop timestamp for this RocketMQSource. */
private final long stopInMs;
/** The start offset for this RocketMQSource. */
private final long startOffset;
/** The partition discovery interval for this RocketMQSource. */
private final long partitionDiscoveryIntervalMs;
/** The boundedness of this RocketMQSource. */
private final Boundedness boundedness;
/** The accessKey used for this RocketMQSource. */
private final String accessKey;
/** The secretKey used for this RocketMQSource. */
private final String secretKey;
private final SplitEnumeratorContext<RocketMQPartitionSplit> context;
// The internal states of the enumerator.
/**
* This set is only accessed by the partition discovery callable in the callAsync() method, i.e
* worker thread.
*/
private final Set<Tuple3<String, String, Integer>> discoveredPartitions;
/** The current assignment by reader id. Only accessed by the coordinator thread. */
private final Map<Integer, List<RocketMQPartitionSplit>> readerIdToSplitAssignments;
/**
* The discovered and initialized partition splits that are waiting for owner reader to be
* ready.
*/
private final Map<Integer, Set<RocketMQPartitionSplit>> pendingPartitionSplitAssignment;
// Lazily instantiated or mutable fields.
private DefaultMQPullConsumer consumer;
private boolean noMoreNewPartitionSplits = false;
public RocketMQSourceEnumerator(
String topic,
String consumerGroup,
String nameServerAddress,
String accessKey,
String secretKey,
long stopInMs,
long startOffset,
long partitionDiscoveryIntervalMs,
Boundedness boundedness,
SplitEnumeratorContext<RocketMQPartitionSplit> context,
String consumerOffsetMode,
long consumerOffsetTimestamp) {
this(
topic,
consumerGroup,
nameServerAddress,
accessKey,
secretKey,
stopInMs,
startOffset,
partitionDiscoveryIntervalMs,
boundedness,
context,
new HashMap<>(),
consumerOffsetMode,
consumerOffsetTimestamp);
}
public RocketMQSourceEnumerator(
String topic,
String consumerGroup,
String nameServerAddress,
String accessKey,
String secretKey,
long stopInMs,
long startOffset,
long partitionDiscoveryIntervalMs,
Boundedness boundedness,
SplitEnumeratorContext<RocketMQPartitionSplit> context,
Map<Integer, List<RocketMQPartitionSplit>> currentSplitsAssignments,
String consumerOffsetMode,
long consumerOffsetTimestamp) {
this.topic = topic;
this.consumerGroup = consumerGroup;
this.nameServerAddress = nameServerAddress;
this.accessKey = accessKey;
this.secretKey = secretKey;
this.stopInMs = stopInMs;
this.startOffset = startOffset;
this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
this.boundedness = boundedness;
this.context = context;
this.discoveredPartitions = new HashSet<>();
this.readerIdToSplitAssignments = new HashMap<>(currentSplitsAssignments);
this.readerIdToSplitAssignments.forEach(
(reader, splits) ->
splits.forEach(
s ->
discoveredPartitions.add(
new Tuple3<>(
s.getTopic(),
s.getBroker(),
s.getPartition()))));
this.pendingPartitionSplitAssignment = new HashMap<>();
this.consumerOffsetMode = consumerOffsetMode;
this.consumerOffsetTimestamp = consumerOffsetTimestamp;
}
@Override
public void start() {
initialRocketMQConsumer();
LOG.info(
"Starting the RocketMQSourceEnumerator for consumer group {} "
+ "with partition discovery interval of {} ms.",
consumerGroup,
partitionDiscoveryIntervalMs);
context.callAsync(
this::discoverAndInitializePartitionSplit,
this::handlePartitionSplitChanges,
0,
partitionDiscoveryIntervalMs);
}
@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
// the RocketMQ source pushes splits eagerly, rather than act upon split requests
}
@Override
public void addSplitsBack(List<RocketMQPartitionSplit> splits, int subtaskId) {
addPartitionSplitChangeToPendingAssignments(splits);
assignPendingPartitionSplits();
}
@Override
public void addReader(int subtaskId) {
LOG.debug(
"Adding reader {} to RocketMQSourceEnumerator for consumer group {}.",
subtaskId,
consumerGroup);
assignPendingPartitionSplits();
if (boundedness == Boundedness.BOUNDED) {
// for RocketMQ bounded source, send this signal to ensure the task can end after all
// the
// splits assigned are completed.
context.signalNoMoreSplits(subtaskId);
}
}
@Override
public RocketMQSourceEnumState snapshotState(long checkpointId) {
return new RocketMQSourceEnumState(readerIdToSplitAssignments);
}
@Override
public void close() {
if (consumer != null) {
consumer.shutdown();
}
}
// ----------------- private methods -------------------
private Set<RocketMQPartitionSplit> discoverAndInitializePartitionSplit()
throws MQClientException {
Set<Tuple3<String, String, Integer>> newPartitions = new HashSet<>();
Set<Tuple3<String, String, Integer>> removedPartitions =
new HashSet<>(Collections.unmodifiableSet(discoveredPartitions));
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(topic);
Set<RocketMQPartitionSplit> result = new HashSet<>();
for (MessageQueue messageQueue : messageQueues) {
Tuple3<String, String, Integer> topicPartition =
new Tuple3<>(
messageQueue.getTopic(),
messageQueue.getBrokerName(),
messageQueue.getQueueId());
if (!removedPartitions.remove(topicPartition)) {
newPartitions.add(topicPartition);
result.add(
new RocketMQPartitionSplit(
topicPartition.f0,
topicPartition.f1,
topicPartition.f2,
getOffsetByMessageQueue(messageQueue),
stopInMs));
}
}
discoveredPartitions.addAll(Collections.unmodifiableSet(newPartitions));
return result;
}
// This method should only be invoked in the coordinator executor thread.
private void handlePartitionSplitChanges(
Set<RocketMQPartitionSplit> partitionSplits, Throwable t) {
if (t != null) {
throw new FlinkRuntimeException("Failed to handle partition splits change due to ", t);
}
if (partitionDiscoveryIntervalMs < 0) {
LOG.debug("");
noMoreNewPartitionSplits = true;
}
addPartitionSplitChangeToPendingAssignments(partitionSplits);
assignPendingPartitionSplits();
}
// This method should only be invoked in the coordinator executor thread.
private void addPartitionSplitChangeToPendingAssignments(
Collection<RocketMQPartitionSplit> newPartitionSplits) {
int numReaders = context.currentParallelism();
for (RocketMQPartitionSplit split : newPartitionSplits) {
int ownerReader =
getSplitOwner(
split.getTopic(), split.getBroker(), split.getPartition(), numReaders);
pendingPartitionSplitAssignment
.computeIfAbsent(ownerReader, r -> new HashSet<>())
.add(split);
}
LOG.debug(
"Assigned {} to {} readers of consumer group {}.",
newPartitionSplits,
numReaders,
consumerGroup);
}
// This method should only be invoked in the coordinator executor thread.
private void assignPendingPartitionSplits() {
Map<Integer, List<RocketMQPartitionSplit>> incrementalAssignment = new HashMap<>();
pendingPartitionSplitAssignment.forEach(
(ownerReader, pendingSplits) -> {
if (!pendingSplits.isEmpty()
&& context.registeredReaders().containsKey(ownerReader)) {
// The owner reader is ready, assign the split to the owner reader.
incrementalAssignment
.computeIfAbsent(ownerReader, r -> new ArrayList<>())
.addAll(pendingSplits);
}
});
if (incrementalAssignment.isEmpty()) {
// No assignment is made.
return;
}
LOG.info("Assigning splits to readers {}", incrementalAssignment);
context.assignSplits(new SplitsAssignment<>(incrementalAssignment));
incrementalAssignment.forEach(
(readerOwner, newPartitionSplits) -> {
// Update the split assignment.
readerIdToSplitAssignments
.computeIfAbsent(readerOwner, r -> new ArrayList<>())
.addAll(newPartitionSplits);
// Clear the pending splits for the reader owner.
pendingPartitionSplitAssignment.remove(readerOwner);
// Sends NoMoreSplitsEvent to the readers if there is no more partition splits
// to be assigned.
if (noMoreNewPartitionSplits) {
LOG.debug(
"No more RocketMQPartitionSplits to assign. Sending NoMoreSplitsEvent to the readers "
+ "in consumer group {}.",
consumerGroup);
context.signalNoMoreSplits(readerOwner);
}
});
}
private long getOffsetByMessageQueue(MessageQueue mq) throws MQClientException {
Long offset = offsetTable.get(mq);
if (offset == null) {
if (startOffset > 0) {
offset = startOffset;
} else {
switch (consumerOffsetMode) {
case CONSUMER_OFFSET_EARLIEST:
offset = consumer.minOffset(mq);
break;
case CONSUMER_OFFSET_LATEST:
offset = consumer.maxOffset(mq);
break;
case CONSUMER_OFFSET_TIMESTAMP:
offset = consumer.searchOffset(mq, consumerOffsetTimestamp);
break;
default:
offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0) {
throw new IllegalArgumentException(
"Unknown value for CONSUMER_OFFSET_RESET_TO.");
}
}
}
}
offsetTable.put(mq, offset);
return offsetTable.get(mq);
}
private void initialRocketMQConsumer() {
try {
if (!StringUtils.isNullOrWhitespaceOnly(accessKey)
&& !StringUtils.isNullOrWhitespaceOnly(secretKey)) {
AclClientRPCHook aclClientRPCHook =
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
consumer = new DefaultMQPullConsumer(consumerGroup, aclClientRPCHook);
} else {
consumer = new DefaultMQPullConsumer(consumerGroup);
}
consumer.setNamesrvAddr(nameServerAddress);
consumer.setInstanceName(
String.join(
"||",
ManagementFactory.getRuntimeMXBean().getName(),
topic,
consumerGroup,
"" + System.nanoTime()));
consumer.start();
} catch (MQClientException e) {
LOG.error("Failed to initial RocketMQ consumer.", e);
consumer.shutdown();
}
}
/**
* Returns the index of the target subtask that a specific RocketMQ partition should be assigned
* to.
*
* <p>The resulting distribution of partitions of a single topic has the following contract:
*
* <ul>
* <li>1. Uniformly distributed across subtasks
* <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending subtask
* indices) by using the partition id as the offset from a starting index (i.e., the index
* of the subtask which partition 0 of the topic will be assigned to, determined using the
* topic name).
* </ul>
*
* @param topic the RocketMQ topic assigned.
* @param broker the RocketMQ broker assigned.
* @param partition the RocketMQ partition to assign.
* @param numReaders the total number of readers.
* @return the id of the subtask that owns the split.
*/
@VisibleForTesting
static int getSplitOwner(String topic, String broker, int partition, int numReaders) {
int startIndex = (((topic + "-" + broker).hashCode() * 31) & 0x7FFFFFFF) % numReaders;
// here, the assumption is that the id of RocketMQ partitions are always ascending
// starting from 0, and therefore can be used directly as the offset clockwise from the
// start index
return (startIndex + partition) % numReaders;
}
}