blob: ddb2e914757b6923ebd2c8884f38a148e385f672 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.common.Handover;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
public class PulsarSourceReader<T> implements SourceReader<T, PulsarPartitionSplit> {
private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceReader.class);
protected final SourceReader.Context context;
protected final PulsarClientConfig clientConfig;
protected final PulsarConsumerConfig consumerConfig;
protected final StartCursor startCursor;
protected final Handover<RecordWithSplitId> handover;
protected final Map<String, PulsarPartitionSplit> splitStates;
protected final Map<String, PulsarSplitReaderThread> splitReaders;
protected final SortedMap<Long, Map<String, MessageId>> pendingCursorsToCommit;
protected final Map<String, MessageId> pendingCursorsToFinish;
protected final Set<String> finishedSplits;
protected final DeserializationSchema<T> deserialization;
/**
* The maximum number of milliseconds to wait for a fetch batch.
*/
protected final int pollTimeout;
protected final long pollInterval;
protected final int batchSize;
protected PulsarClient pulsarClient;
/**
* Indicating whether the SourceReader will be assigned more splits or not.
*/
private boolean noMoreSplitsAssignment = false;
public PulsarSourceReader(SourceReader.Context context,
PulsarClientConfig clientConfig,
PulsarConsumerConfig consumerConfig,
StartCursor startCursor,
DeserializationSchema<T> deserialization,
int pollTimeout,
long pollInterval,
int batchSize) {
this.context = context;
this.clientConfig = clientConfig;
this.consumerConfig = consumerConfig;
this.startCursor = startCursor;
this.deserialization = deserialization;
this.pollTimeout = pollTimeout;
this.pollInterval = pollInterval;
this.batchSize = batchSize;
this.splitStates = new HashMap<>();
this.splitReaders = new HashMap<>();
this.pendingCursorsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
this.pendingCursorsToFinish = Collections.synchronizedSortedMap(new TreeMap<>());
this.finishedSplits = new TreeSet<>();
this.handover = new Handover<>();
}
@Override
public void open() {
this.pulsarClient = PulsarConfigUtil.createClient(clientConfig);
}
@Override
public void close() throws IOException {
if (pulsarClient != null) {
pulsarClient.close();
}
splitReaders.values().forEach(reader -> {
try {
reader.close();
} catch (IOException e) {
throw new RuntimeException("Failed to close the split reader thread.", e);
}
});
}
@Override
public void pollNext(Collector<T> output) throws Exception {
for (int i = 0; i < batchSize; i++) {
Optional<RecordWithSplitId> recordWithSplitId = handover.pollNext();
if (recordWithSplitId.isPresent()) {
final String splitId = recordWithSplitId.get().getSplitId();
final Message<byte[]> message = recordWithSplitId.get().getMessage();
synchronized (output.getCheckpointLock()) {
splitStates.get(splitId).setLatestConsumedId(message.getMessageId());
deserialization.deserialize(message.getData(), output);
}
}
if (noMoreSplitsAssignment && finishedSplits.size() == splitStates.size()) {
context.signalNoMoreElement();
break;
}
}
}
@Override
public List<PulsarPartitionSplit> snapshotState(long checkpointId) throws Exception {
List<PulsarPartitionSplit> pendingSplit = splitStates.values().stream()
.map(PulsarPartitionSplit::copy)
.collect(Collectors.toList());
// Perform a snapshot for these splits.
int size = pendingSplit.size();
Map<String, MessageId> cursors =
pendingCursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>(size));
// Put the cursors of the active splits.
for (PulsarPartitionSplit split : pendingSplit) {
MessageId latestConsumedId = split.getLatestConsumedId();
if (latestConsumedId != null) {
cursors.put(split.splitId(), latestConsumedId);
}
}
return pendingSplit;
}
@Override
public void addSplits(List<PulsarPartitionSplit> splits) {
splits.forEach(split -> {
splitStates.put(split.splitId(), split);
PulsarSplitReaderThread splitReaderThread = createPulsarSplitReaderThread(split);
try {
splitReaderThread.open();
splitReaders.put(split.splitId(), splitReaderThread);
splitReaderThread.start();
} catch (PulsarClientException e) {
throw new RuntimeException("Failed to start the split reader thread.", e);
}
});
}
protected PulsarSplitReaderThread createPulsarSplitReaderThread(PulsarPartitionSplit split) {
return new PulsarSplitReaderThread(this,
split,
pulsarClient,
consumerConfig,
pollTimeout,
pollInterval,
startCursor,
handover);
}
public void handleNoMoreElements(String splitId, MessageId messageId) {
LOG.info("Reader received the split {} NoMoreElements event.", splitId);
pendingCursorsToFinish.put(splitId, messageId);
}
@Override
public void handleNoMoreSplits() {
LOG.info("Reader received NoMoreSplits event.");
this.noMoreSplitsAssignment = true;
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
LOG.debug("Committing cursors for checkpoint {}", checkpointId);
Map<String, MessageId> pendingCursors = pendingCursorsToCommit.remove(checkpointId);
if (pendingCursors == null) {
LOG.debug(
"Cursors for checkpoint {} either do not exist or have already been committed.",
checkpointId);
return;
}
pendingCursors.forEach((splitId, messageId) -> {
if (finishedSplits.contains(splitId)) {
return;
}
splitReaders.get(splitId).committingCursor(messageId);
if (pendingCursorsToFinish.containsKey(splitId) &&
pendingCursorsToFinish.get(splitId).compareTo(messageId) == 0) {
finishedSplits.add(splitId);
try {
splitReaders.get(splitId).close();
} catch (IOException e) {
throw new RuntimeException("Failed to close the split reader thread.", e);
}
}
});
}
}