blob: 9c8e3d742ac0d7f33c9e1cb8a8f3d637d2a5009a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.pulsar.source.reader.split;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
/**
* The split reader a given {@link PulsarPartitionSplit}, it would be closed once the {@link
* PulsarUnorderedSourceReader} is closed.
*
* @param <OUT> the type of the pulsar source message that would be serialized to downstream.
*/
@Internal
public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSplitReaderBase<OUT> {
private static final Logger LOG =
LoggerFactory.getLogger(PulsarUnorderedPartitionSplitReader.class);
private static final Duration REDELIVER_TIME = Duration.ofSeconds(3);
private final TransactionCoordinatorClient coordinatorClient;
@Nullable private Transaction uncommittedTransaction;
public PulsarUnorderedPartitionSplitReader(
PulsarClient pulsarClient,
PulsarAdmin pulsarAdmin,
Configuration configuration,
SourceConfiguration sourceConfiguration,
PulsarDeserializationSchema<OUT> deserializationSchema,
TransactionCoordinatorClient coordinatorClient) {
super(pulsarClient, pulsarAdmin, configuration, sourceConfiguration, deserializationSchema);
this.coordinatorClient = coordinatorClient;
}
@Override
protected Message<byte[]> pollMessage(Duration timeout)
throws ExecutionException, InterruptedException, TimeoutException {
Message<byte[]> message =
pulsarConsumer.receiveAsync().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (!sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
if (uncommittedTransaction == null) {
// Create a transaction.
this.uncommittedTransaction = newTransaction();
}
try {
// Add this message into transaction.
pulsarConsumer
.acknowledgeAsync(message.getMessageId(), uncommittedTransaction)
.get();
} catch (InterruptedException e) {
sneakyClient(
() ->
pulsarConsumer.reconsumeLater(
message, REDELIVER_TIME.toMillis(), TimeUnit.MILLISECONDS));
Thread.currentThread().interrupt();
throw e;
} catch (ExecutionException e) {
sneakyClient(
() ->
pulsarConsumer.reconsumeLater(
message, REDELIVER_TIME.toMillis(), TimeUnit.MILLISECONDS));
throw e;
}
}
return message;
}
@Override
protected void finishedPollMessage(Message<byte[]> message) {
if (sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
sneakyClient(() -> pulsarConsumer.acknowledge(message));
}
// Release message
message.release();
}
@Override
protected void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
TxnID uncommittedTransactionId = split.getUncommittedTransactionId();
// Abort the uncommitted pulsar transaction.
if (uncommittedTransactionId != null) {
if (coordinatorClient != null) {
try {
coordinatorClient.abort(uncommittedTransactionId);
} catch (TransactionCoordinatorClientException e) {
LOG.error(
"Failed to abort the uncommitted transaction {} when restart the reader",
uncommittedTransactionId,
e);
}
}
// Redeliver unacknowledged messages because of the message is out of order.
consumer.redeliverUnacknowledgedMessages();
}
}
public PulsarPartitionSplitState snapshotState(long checkpointId) {
PulsarPartitionSplitState state = new PulsarPartitionSplitState(registeredSplit);
// Avoiding NP problem when Pulsar don't get the message before Flink checkpoint.
if (uncommittedTransaction != null) {
TxnID txnID = PulsarTransactionUtils.getId(uncommittedTransaction);
this.uncommittedTransaction = newTransaction();
state.setUncommittedTransactionId(txnID);
}
return state;
}
private Transaction newTransaction() {
long timeoutMillis = sourceConfiguration.getTransactionTimeoutMillis();
CompletableFuture<Transaction> future =
sneakyClient(pulsarClient::newTransaction)
.withTransactionTimeout(timeoutMillis, TimeUnit.MILLISECONDS)
.build();
try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}