blob: 43c5fedf7434c7b2c5ec540faa31b69495955860 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.pulsar.sink.writer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextImpl;
import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
import org.apache.flink.connector.pulsar.sink.writer.topic.TopicProducerRegister;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.emptyList;
import static org.apache.flink.util.IOUtils.closeAll;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* This class is responsible to write records in a Pulsar topic and to handle the different delivery
* {@link DeliveryGuarantee}s.
*
* @param <IN> The type of the input elements.
*/
@Internal
public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommittable> {
private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class);
private final PulsarSerializationSchema<IN> serializationSchema;
private final TopicMetadataListener metadataListener;
private final TopicRouter<IN> topicRouter;
private final MessageDelayer<IN> messageDelayer;
private final DeliveryGuarantee deliveryGuarantee;
private final PulsarSinkContext sinkContext;
private final MailboxExecutor mailboxExecutor;
private final TopicProducerRegister producerRegister;
private final AtomicLong pendingMessages = new AtomicLong(0);
/**
* Constructor creating a Pulsar writer.
*
* <p>It will throw a {@link RuntimeException} if {@link
* PulsarSerializationSchema#open(InitializationContext, PulsarSinkContext, SinkConfiguration)}
* fails.
*
* @param sinkConfiguration The configuration to configure the Pulsar producer.
* @param serializationSchema Transform the incoming records into different message properties.
* @param metadataListener The listener for querying topic metadata.
* @param topicRouter Topic router to choose topic by incoming records.
* @param initContext Context to provide information about the runtime environment.
*/
public PulsarWriter(
SinkConfiguration sinkConfiguration,
PulsarSerializationSchema<IN> serializationSchema,
TopicMetadataListener metadataListener,
TopicRouter<IN> topicRouter,
MessageDelayer<IN> messageDelayer,
InitContext initContext) {
checkNotNull(sinkConfiguration);
this.serializationSchema = checkNotNull(serializationSchema);
this.metadataListener = checkNotNull(metadataListener);
this.topicRouter = checkNotNull(topicRouter);
this.messageDelayer = checkNotNull(messageDelayer);
checkNotNull(initContext);
this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
this.sinkContext = new PulsarSinkContextImpl(initContext, sinkConfiguration);
// Initialize topic metadata listener.
LOG.debug("Initialize topic metadata after creating Pulsar writer.");
ProcessingTimeService timeService = initContext.getProcessingTimeService();
this.metadataListener.open(sinkConfiguration, timeService);
// Initialize topic router.
this.topicRouter.open(sinkConfiguration);
// Initialize the serialization schema.
try {
InitializationContext initializationContext =
initContext.asSerializationSchemaInitializationContext();
this.serializationSchema.open(initializationContext, sinkContext, sinkConfiguration);
} catch (Exception e) {
throw new FlinkRuntimeException("Cannot initialize schema.", e);
}
// Create this producer register after opening serialization schema!
this.producerRegister = new TopicProducerRegister(sinkConfiguration);
this.mailboxExecutor = initContext.getMailboxExecutor();
}
@Override
public void write(IN element, Context context) throws IOException, InterruptedException {
PulsarMessage<?> message = serializationSchema.serialize(element, sinkContext);
// Choose the right topic to send.
String key = message.getKey();
List<String> availableTopics = metadataListener.availableTopics();
String topic = topicRouter.route(element, key, availableTopics, sinkContext);
// Create message builder for sending message.
TypedMessageBuilder<?> builder = createMessageBuilder(topic, context, message);
// Message Delay delivery.
long deliverAt = messageDelayer.deliverAt(element, sinkContext);
if (deliverAt > 0) {
builder.deliverAt(deliverAt);
}
// Perform message sending.
if (deliveryGuarantee == DeliveryGuarantee.NONE) {
// We would just ignore the sending exception. This may cause data loss.
builder.sendAsync();
} else {
// Increase the pending message count.
pendingMessages.incrementAndGet();
builder.sendAsync()
.whenComplete(
(id, ex) -> {
pendingMessages.decrementAndGet();
if (ex != null) {
mailboxExecutor.execute(
() -> {
throw new FlinkRuntimeException(
"Failed to send data to Pulsar " + topic,
ex);
},
"Failed to send data to Pulsar");
} else {
LOG.debug(
"Sent message to Pulsar {} with message id {}",
topic,
id);
}
});
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
private TypedMessageBuilder<?> createMessageBuilder(
String topic, Context context, PulsarMessage<?> message) {
Schema<?> schema = message.getSchema();
TypedMessageBuilder<?> builder = producerRegister.createMessageBuilder(topic, schema);
byte[] orderingKey = message.getOrderingKey();
if (orderingKey != null && orderingKey.length > 0) {
builder.orderingKey(orderingKey);
}
String key = message.getKey();
if (!Strings.isNullOrEmpty(key)) {
builder.key(key);
}
long eventTime = message.getEventTime();
if (eventTime > 0) {
builder.eventTime(eventTime);
} else {
// Set default message timestamp if flink has provided one.
Long timestamp = context.timestamp();
if (timestamp != null && timestamp > 0L) {
builder.eventTime(timestamp);
}
}
// Schema evolution would serialize the message by Pulsar Schema in TypedMessageBuilder.
// The type has been checked in PulsarMessageBuilder#value.
((TypedMessageBuilder) builder).value(message.getValue());
Map<String, String> properties = message.getProperties();
if (properties != null && !properties.isEmpty()) {
builder.properties(properties);
}
Long sequenceId = message.getSequenceId();
if (sequenceId != null) {
builder.sequenceId(sequenceId);
}
List<String> clusters = message.getReplicationClusters();
if (clusters != null && !clusters.isEmpty()) {
builder.replicationClusters(clusters);
}
if (message.isDisableReplication()) {
builder.disableReplication();
}
return builder;
}
@Override
public void flush(boolean endOfInput) throws IOException {
if (endOfInput || deliveryGuarantee != DeliveryGuarantee.NONE) {
LOG.info("Flush the pending messages to Pulsar.");
// Try to flush pending messages.
producerRegister.flush();
// Make sure all the pending messages should be flushed to Pulsar.
while (pendingMessages.longValue() > 0) {
producerRegister.flush();
}
}
}
@Override
public Collection<PulsarCommittable> prepareCommit() {
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
return producerRegister.prepareCommit();
} else {
return emptyList();
}
}
@Override
public void close() throws Exception {
// Close all the resources and throw the exception at last.
closeAll(metadataListener, producerRegister);
}
}