blob: 0c26e326a93e380b6983db81326e2b8fb2954819 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.rabbitmq;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Preconditions;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
* RabbitMQ source (consumer) which reads from a queue and acknowledges messages on checkpoints.
* When checkpointing is enabled, it guarantees exactly-once processing semantics.
*
* <p>RabbitMQ requires messages to be acknowledged. On failures, RabbitMQ will re-resend all messages
* which have not been acknowledged previously. When a failure occurs directly after a completed
* checkpoint, all messages part of this checkpoint might be processed again because they couldn't
* be acknowledged before failure. This case is handled by the {@link MessageAcknowledgingSourceBase}
* base class which deduplicates the messages using the correlation id.
*
* <p>RabbitMQ's Delivery Tags do NOT represent unique ids / offsets. That's why the source uses the
* Correlation ID in the message properties to check for duplicate messages. Note that the
* correlation id has to be set at the producer. If the correlation id is not set, messages may
* be produced more than once in corner cases.
*
* <p>This source can be operated in three different modes:
*
* <p>1) Exactly-once (when checkpointed) with RabbitMQ transactions and messages with
* unique correlation IDs.
* 2) At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism
* (correlation id is not set).
* 3) No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
*
* <p>Users may overwrite the setupConnectionFactory() method to pass their setup their own
* ConnectionFactory in case the constructor parameters are not sufficient.
*
* @param <OUT> The type of the data read from RabbitMQ.
*/
public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
implements ResultTypeQueryable<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
private final RMQConnectionConfig rmqConnectionConfig;
protected final String queueName;
private final boolean usesCorrelationId;
protected DeserializationSchema<OUT> schema;
protected boolean autoAck = false;
protected transient Connection connection;
protected transient Channel channel;
protected transient QueueingConsumer consumer;
private transient volatile boolean running;
/**
* Creates a new RabbitMQ source with at-least-once message processing guarantee when
* checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled.
*
* <p>For exactly-once, please use the constructor
* {@link RMQSource#RMQSource(RMQConnectionConfig, String, boolean, DeserializationSchema)}.
* @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
* @param queueName The queue to receive messages from.
* @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
* into Java objects.
*/
public RMQSource(RMQConnectionConfig rmqConnectionConfig, String queueName,
DeserializationSchema<OUT> deserializationSchema) {
this(rmqConnectionConfig, queueName, false, deserializationSchema);
}
/**
* Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of messages
* at the producer. The correlation id must be unique. Otherwise the behavior of the source is
* undefined. If in doubt, set usesCorrelationId to false. When correlation ids are not
* used, this source has at-least-once processing semantics when checkpointing is enabled.
* @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
* @param queueName The queue to receive messages from.
* @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b>
* id to deduplicate messages (in case of failed acknowledgments).
* Only used when checkpointing is enabled.
* @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
* into Java objects.
*/
public RMQSource(RMQConnectionConfig rmqConnectionConfig,
String queueName, boolean usesCorrelationId, DeserializationSchema<OUT> deserializationSchema) {
this(rmqConnectionConfig, queueName, usesCorrelationId, deserializationSchema, false);
}
public RMQSource(
RMQConnectionConfig rmqConnectionConfig,
String queueName,
boolean usesCorrelationId,
DeserializationSchema<OUT> deserializationSchema,
boolean autoAck) {
super(String.class);
this.rmqConnectionConfig = rmqConnectionConfig;
this.queueName = queueName;
this.usesCorrelationId = usesCorrelationId;
this.schema = deserializationSchema;
this.autoAck = autoAck;
}
/**
* Initializes the connection to RMQ with a default connection factory. The user may override
* this method to setup and configure their own ConnectionFactory.
*/
protected ConnectionFactory setupConnectionFactory() throws Exception {
return rmqConnectionConfig.getConnectionFactory();
}
/**
* Sets up the queue. The default implementation just declares the queue. The user may override
* this method to have a custom setup for the queue (i.e. binding the queue to an exchange or
* defining custom queue parameters)
*/
protected void setupQueue() throws IOException {
channel.queueDeclare(queueName, true, false, false, null);
}
@Override
public void open(Configuration config) throws Exception {
super.open(config);
ConnectionFactory factory = setupConnectionFactory();
try {
connection = factory.newConnection();
channel = connection.createChannel();
if (channel == null) {
throw new RuntimeException("None of RabbitMQ channels are available");
}
setupQueue();
consumer = new QueueingConsumer(channel);
if (!autoAck) {
// enables transaction mode
channel.txSelect();
}
LOG.debug("Starting RabbitMQ source with autoAck status: " + autoAck);
channel.basicConsume(queueName, autoAck, consumer);
} catch (IOException e) {
throw new RuntimeException("Cannot create RMQ connection with " + queueName + " at "
+ rmqConnectionConfig.getHost(), e);
}
running = true;
}
@Override
public void close() throws Exception {
super.close();
try {
if (connection != null) {
connection.close();
}
} catch (IOException e) {
throw new RuntimeException("Error while closing RMQ connection with " + queueName
+ " at " + rmqConnectionConfig.getHost(), e);
}
}
@Override
public void run(SourceContext<OUT> ctx) throws Exception {
while (running) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
synchronized (ctx.getCheckpointLock()) {
OUT result = schema.deserialize(delivery.getBody());
if (schema.isEndOfStream(result)) {
break;
}
if (!autoAck) {
final long deliveryTag = delivery.getEnvelope().getDeliveryTag();
if (usesCorrelationId) {
final String correlationId = delivery.getProperties().getCorrelationId();
Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " +
"with usesCorrelationId set to true but a message was received with " +
"correlation id set to null!");
if (!addId(correlationId)) {
// we have already processed this message
continue;
}
}
sessionIds.add(deliveryTag);
}
ctx.collect(result);
}
}
}
@Override
public void cancel() {
running = false;
}
@Override
protected void acknowledgeSessionIDs(List<Long> sessionIds) {
try {
for (long id : sessionIds) {
channel.basicAck(id, false);
}
channel.txCommit();
} catch (IOException e) {
throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
}
}
@Override
public TypeInformation<OUT> getProducedType() {
return schema.getProducedType();
}
}