blob: a192271b13609caf57351bac9c706cf4fe9bb6b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.SubscriptionStartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.LatestMessageStopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.PulsarDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicPatternDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class PulsarSplitEnumerator implements SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> {
private static final Logger LOG = LoggerFactory.getLogger(PulsarSplitEnumerator.class);
private final SourceSplitEnumerator.Context<PulsarPartitionSplit> context;
private final PulsarAdminConfig adminConfig;
private final PulsarDiscoverer partitionDiscoverer;
private final long partitionDiscoveryIntervalMs;
private final StartCursor startCursor;
private final StopCursor stopCursor;
/**
* The consumer group id used for this PulsarSource.
*/
private final String subscriptionName;
/**
* Partitions that have been assigned to readers.
*/
private final Set<TopicPartition> assignedPartitions;
/**
* The discovered and initialized partition splits that are waiting for owner reader to be
* ready.
*/
private final Map<Integer, Set<PulsarPartitionSplit>> pendingPartitionSplits;
private PulsarAdmin pulsarAdmin;
// This flag will be marked as true if periodically partition discovery is disabled AND the
// initializing partition discovery has finished.
private boolean noMoreNewPartitionSplits = false;
private ScheduledThreadPoolExecutor executor = null;
public PulsarSplitEnumerator(
SourceSplitEnumerator.Context<PulsarPartitionSplit> context,
PulsarAdminConfig adminConfig,
PulsarDiscoverer partitionDiscoverer,
long partitionDiscoveryIntervalMs,
StartCursor startCursor,
StopCursor stopCursor,
String subscriptionName) {
this(
context,
adminConfig,
partitionDiscoverer,
partitionDiscoveryIntervalMs,
startCursor,
stopCursor,
subscriptionName,
Collections.emptySet());
}
public PulsarSplitEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> context,
PulsarAdminConfig adminConfig,
PulsarDiscoverer partitionDiscoverer,
long partitionDiscoveryIntervalMs,
StartCursor startCursor,
StopCursor stopCursor,
String subscriptionName,
Set<TopicPartition> assignedPartitions) {
if ((partitionDiscoverer instanceof TopicPatternDiscoverer)
&& partitionDiscoveryIntervalMs > 0
&& Boundedness.BOUNDED == stopCursor.getBoundedness()) {
throw new IllegalArgumentException("Bounded streams do not support dynamic partition discovery.");
}
this.context = context;
this.adminConfig = adminConfig;
this.partitionDiscoverer = partitionDiscoverer;
this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
this.startCursor = startCursor;
this.stopCursor = stopCursor;
this.subscriptionName = subscriptionName;
this.assignedPartitions = new HashSet<>(assignedPartitions);
this.pendingPartitionSplits = new HashMap<>();
}
@Override
public void open() {
this.pulsarAdmin = PulsarConfigUtil.createAdmin(adminConfig);
}
@Override
public void run() throws Exception {
if (partitionDiscoveryIntervalMs > 0) {
executor = new ScheduledThreadPoolExecutor(1, runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName("pulsar-split-discovery-executor");
return thread;
});
executor.scheduleAtFixedRate(this::discoverySplits, 0, partitionDiscoveryIntervalMs, TimeUnit.MILLISECONDS);
} else {
discoverySplits();
}
}
private void discoverySplits() {
Set<TopicPartition> subscribedTopicPartitions = partitionDiscoverer.getSubscribedTopicPartitions(pulsarAdmin);
checkPartitionChanges(subscribedTopicPartitions);
}
private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions) {
// Append the partitions into current assignment state.
final Set<TopicPartition> newPartitions = getNewPartitions(fetchedPartitions);
if (newPartitions.isEmpty()) {
return;
}
if (partitionDiscoveryIntervalMs <= 0 && !noMoreNewPartitionSplits) {
LOG.debug("Partition discovery is disabled.");
noMoreNewPartitionSplits = true;
}
List<PulsarPartitionSplit> newSplits = newPartitions.stream()
.map(this::createPulsarPartitionSplit)
.collect(Collectors.toList());
addPartitionSplitChangeToPendingAssignments(newSplits);
assignPendingPartitionSplits(context.registeredReaders());
}
private PulsarPartitionSplit createPulsarPartitionSplit(TopicPartition partition) {
StopCursor partitionStopCursor = stopCursor.copy();
PulsarPartitionSplit split = new PulsarPartitionSplit(partition, partitionStopCursor);
if (partitionStopCursor instanceof LatestMessageStopCursor) {
((LatestMessageStopCursor) partitionStopCursor).prepare(pulsarAdmin, partition);
}
if (startCursor instanceof SubscriptionStartCursor) {
((SubscriptionStartCursor) startCursor).ensureSubscription(subscriptionName, partition, pulsarAdmin);
}
return split;
}
private Set<TopicPartition> getNewPartitions(Set<TopicPartition> fetchedPartitions) {
Consumer<TopicPartition> dedupOrMarkAsRemoved = fetchedPartitions::remove;
assignedPartitions.forEach(dedupOrMarkAsRemoved);
pendingPartitionSplits.forEach(
(reader, splits) ->
splits.forEach(
split -> dedupOrMarkAsRemoved.accept(split.getPartition())));
if (!fetchedPartitions.isEmpty()) {
LOG.info("Discovered new partitions: {}", fetchedPartitions);
}
return fetchedPartitions;
}
private void addPartitionSplitChangeToPendingAssignments(
Collection<PulsarPartitionSplit> newPartitionSplits) {
int numReaders = context.currentParallelism();
for (PulsarPartitionSplit split : newPartitionSplits) {
int ownerReader = getSplitOwner(split.getPartition(), numReaders);
pendingPartitionSplits
.computeIfAbsent(ownerReader, r -> new HashSet<>())
.add(split);
}
LOG.debug(
"Assigned {} to {} readers of subscription {}.",
newPartitionSplits,
numReaders,
subscriptionName);
}
@SuppressWarnings("checkstyle:MagicNumber")
static int getSplitOwner(TopicPartition tp, int numReaders) {
int startIndex = ((tp.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numReaders;
// here, the assumption is that the id of pulsar partitions are always ascending
// starting from 0, and therefore can be used directly as the offset clockwise from the
// start index
return (startIndex + tp.getPartition()) % numReaders;
}
private void assignPendingPartitionSplits(Set<Integer> pendingReaders) {
// Check if there's any pending splits for given readers
for (int pendingReader : pendingReaders) {
// Remove pending assignment for the reader
final Set<PulsarPartitionSplit> pendingAssignmentForReader =
pendingPartitionSplits.remove(pendingReader);
if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
// Mark pending partitions as already assigned
pendingAssignmentForReader.forEach(
split -> assignedPartitions.add(split.getPartition()));
// Assign pending splits to reader
LOG.info("Assigning splits to readers {}", pendingAssignmentForReader);
context.assignSplit(pendingReader, new ArrayList<>(pendingAssignmentForReader));
}
}
// If periodically partition discovery is disabled and the initializing discovery has done,
// signal NoMoreSplitsEvent to pending readers
if (noMoreNewPartitionSplits && stopCursor.getBoundedness() == Boundedness.BOUNDED) {
LOG.debug(
"No more PulsarPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {}"
+ " in subscription {}.",
pendingReaders,
subscriptionName);
pendingReaders.forEach(context::signalNoMoreSplits);
}
}
@Override
public void close() throws IOException {
if (pulsarAdmin != null) {
pulsarAdmin.close();
}
if (executor != null) {
executor.shutdown();
}
}
@Override
public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
addPartitionSplitChangeToPendingAssignments(splits);
// If the failed subtask has already restarted, we need to assign pending splits to it
if (context.registeredReaders().contains(subtaskId)) {
assignPendingPartitionSplits(Collections.singleton(subtaskId));
}
}
@Override
public int currentUnassignedSplitSize() {
return pendingPartitionSplits.size();
}
@Override
public void handleSplitRequest(int subtaskId) {
// Do nothing because Pulsar source push split.
}
@Override
public void registerReader(int subtaskId) {
LOG.debug(
"Adding reader {} to PulsarSourceEnumerator for subscription {}.",
subtaskId,
subscriptionName);
assignPendingPartitionSplits(Collections.singleton(subtaskId));
}
@Override
public PulsarSplitEnumeratorState snapshotState(long checkpointId) throws Exception {
return new PulsarSplitEnumeratorState(assignedPartitions);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// nothing
}
}