blob: 76d6a1fbce4a3b4f35677b052cab123d9ab20b50 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import org.apache.commons.lang.Validate;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.flink.common.selector.TopicSelector;
import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The RocketMQSink provides at-least-once reliability guarantees when
* checkpoints are enabled and batchFlushOnCheckpoint(true) is set.
* Otherwise, the sink reliability guarantees depends on rocketmq producer's retry policy.
*/
public class RocketMQSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RocketMQSink.class);
private transient DefaultMQProducer producer;
private boolean async; // false by default
private Properties props;
private TopicSelector<IN> topicSelector;
private KeyValueSerializationSchema<IN> serializationSchema;
private boolean batchFlushOnCheckpoint; // false by default
private int batchSize = 1000;
private List<Message> batchList;
private int messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
public RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props) {
this.serializationSchema = schema;
this.topicSelector = topicSelector;
this.props = props;
if (this.props != null) {
this.messageDeliveryDelayLevel = RocketMQUtils.getInteger(this.props, RocketMQConfig.MSG_DELAY_LEVEL,
RocketMQConfig.MSG_DELAY_LEVEL00);
if (this.messageDeliveryDelayLevel < RocketMQConfig.MSG_DELAY_LEVEL00) {
this.messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
} else if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL18) {
this.messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL18;
}
}
}
@Override
public void open(Configuration parameters) throws Exception {
Validate.notEmpty(props, "Producer properties can not be empty");
Validate.notNull(topicSelector, "TopicSelector can not be null");
Validate.notNull(serializationSchema, "KeyValueSerializationSchema can not be null");
producer = new DefaultMQProducer(RocketMQConfig.buildAclRPCHook(props));
producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());
RocketMQConfig.buildProducerConfigs(props, producer);
batchList = new LinkedList<>();
if (batchFlushOnCheckpoint && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
batchFlushOnCheckpoint = false;
}
try {
producer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}
}
@Override
public void invoke(IN input, Context context) throws Exception {
Message msg = prepareMessage(input);
if (batchFlushOnCheckpoint) {
batchList.add(msg);
if (batchList.size() >= batchSize) {
flushSync();
}
return;
}
if (async) {
try {
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
LOG.debug("Async send message success! result: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
if (throwable != null) {
LOG.error("Async send message failure!", throwable);
}
}
});
} catch (Exception e) {
LOG.error("Async send message failure!", e);
}
} else {
try {
SendResult result = producer.send(msg);
LOG.debug("Sync send message result: {}", result);
if (result.getSendStatus() != SendStatus.SEND_OK) {
throw new RemotingException(result.toString());
}
} catch (Exception e) {
LOG.error("Sync send message failure!", e);
throw e;
}
}
}
private Message prepareMessage(IN input) {
String topic = topicSelector.getTopic(input);
String tag = (tag = topicSelector.getTag(input)) != null ? tag : "";
byte[] k = serializationSchema.serializeKey(input);
String key = k != null ? new String(k, StandardCharsets.UTF_8) : "";
byte[] value = serializationSchema.serializeValue(input);
Validate.notNull(topic, "the message topic is null");
Validate.notNull(value, "the message body is null");
Message msg = new Message(topic, tag, key, value);
if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL00) {
msg.setDelayTimeLevel(this.messageDeliveryDelayLevel);
}
return msg;
}
public RocketMQSink<IN> withAsync(boolean async) {
this.async = async;
return this;
}
public RocketMQSink<IN> withBatchFlushOnCheckpoint(boolean batchFlushOnCheckpoint) {
this.batchFlushOnCheckpoint = batchFlushOnCheckpoint;
return this;
}
public RocketMQSink<IN> withBatchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
@Override
public void close() throws Exception {
if (producer != null) {
try {
flushSync();
} catch (Exception e) {
LOG.error("FlushSync failure!", e);
}
// make sure producer can be shutdown, thus current producerGroup will be unregistered
producer.shutdown();
}
}
private void flushSync() throws Exception {
if (batchFlushOnCheckpoint) {
synchronized (batchList) {
if (batchList.size() > 0) {
producer.send(batchList);
batchList.clear();
}
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
flushSync();
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Nothing to do
}
}