blob: fa578396d5ceb851c2f763bca0bde957734d9d28 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy;
import org.apache.rocketmq.flink.legacy.common.config.StartupMode;
import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema;
import org.apache.rocketmq.flink.legacy.common.util.MetricUtils;
import org.apache.rocketmq.flink.legacy.common.util.RetryUtil;
import org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils;
import org.apache.rocketmq.flink.legacy.common.watermark.WaterMarkForAll;
import org.apache.rocketmq.flink.legacy.common.watermark.WaterMarkPerQueue;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.curator5.com.google.common.collect.Lists;
import org.apache.flink.shaded.curator5.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang.Validate;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_BATCH_SIZE;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_TIMEOUT;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TIMEOUT;
import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getInteger;
/**
* The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability
* guarantees when checkpoints are enabled. Otherwise, the source doesn't provide any reliability
* guarantees.
*/
public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT>
implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(RocketMQSourceFunction.class);
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
private RunningChecker runningChecker;
private transient DefaultLitePullConsumer consumer;
private KeyValueDeserializationSchema<OUT> schema;
private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
private Map<MessageQueue, Long> offsetTable;
private Map<MessageQueue, Long> restoredOffsets;
private List<MessageQueue> messageQueues;
private ExecutorService executor;
// watermark in source
private WaterMarkPerQueue waterMarkPerQueue;
private WaterMarkForAll waterMarkForAll;
private ScheduledExecutorService timer;
/** Data for pending but uncommitted offsets. */
private LinkedMap pendingOffsetsToCommit;
private Properties props;
private String topic;
private String group;
private transient volatile boolean restored;
private transient boolean enableCheckpoint;
private volatile Object checkPointLock;
private Meter tpsMetric;
private MetricUtils.TimestampGauge fetchDelay = new MetricUtils.TimestampGauge();
private MetricUtils.TimestampGauge emitDelay = new MetricUtils.TimestampGauge();
/** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
private StartupMode startMode = StartupMode.GROUP_OFFSETS;
/**
* If StartupMode#GROUP_OFFSETS has no commit offset.OffsetResetStrategy would offer init
* strategy.
*/
private OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.LATEST;
/**
* Specific startup offsets; only relevant when startup mode is {@link
* StartupMode#SPECIFIC_OFFSETS}.
*/
private Map<MessageQueue, Long> specificStartupOffsets;
/**
* Specific startup offsets; only relevant when startup mode is {@link StartupMode#TIMESTAMP}.
*/
private long specificTimeStamp;
public RocketMQSourceFunction(KeyValueDeserializationSchema<OUT> schema, Properties props) {
this.schema = schema;
this.props = props;
}
@Override
public void open(Configuration parameters) throws Exception {
log.debug("source open....");
Validate.notEmpty(props, "Consumer properties can not be empty");
this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
Validate.notEmpty(topic, "Consumer topic can not be empty");
Validate.notEmpty(group, "Consumer group can not be empty");
String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG);
String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
Validate.isTrue(
!(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
"Consumer tag and sql can not set value at the same time");
this.enableCheckpoint =
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
if (offsetTable == null) {
offsetTable = new ConcurrentHashMap<>();
}
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
if (pendingOffsetsToCommit == null) {
pendingOffsetsToCommit = new LinkedMap();
}
if (checkPointLock == null) {
checkPointLock = new ReentrantLock();
}
if (waterMarkPerQueue == null) {
waterMarkPerQueue = new WaterMarkPerQueue(5000);
}
if (waterMarkForAll == null) {
waterMarkForAll = new WaterMarkForAll(5000);
}
if (timer == null) {
timer = Executors.newSingleThreadScheduledExecutor();
}
runningChecker = new RunningChecker();
runningChecker.setRunning(true);
final ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("rmq-pull-thread-%d")
.build();
executor = Executors.newCachedThreadPool(threadFactory);
int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
consumer = new DefaultLitePullConsumer(group, RocketMQConfig.buildAclRPCHook(props));
RocketMQConfig.buildConsumerConfigs(props, consumer);
// set unique instance name, avoid exception:
// https://help.aliyun.com/document_detail/29646.html
String runtimeName = ManagementFactory.getRuntimeMXBean().getName();
String instanceName =
RocketMQUtils.getInstanceName(
runtimeName,
topic,
group,
String.valueOf(indexOfThisSubTask),
String.valueOf(System.nanoTime()));
consumer.setInstanceName(instanceName);
consumer.start();
Counter outputCounter =
getRuntimeContext()
.getMetricGroup()
.counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter());
tpsMetric =
getRuntimeContext()
.getMetricGroup()
.meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60));
getRuntimeContext()
.getMetricGroup()
.gauge(MetricUtils.CURRENT_FETCH_EVENT_TIME_LAG, fetchDelay);
getRuntimeContext()
.getMetricGroup()
.gauge(MetricUtils.CURRENT_EMIT_EVENT_TIME_LAG, emitDelay);
final RuntimeContext ctx = getRuntimeContext();
// The lock that guarantees that record emission and state updates are atomic,
// from the view of taking a checkpoint.
int taskNumber = ctx.getNumberOfParallelSubtasks();
int taskIndex = ctx.getIndexOfThisSubtask();
log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", taskNumber, taskIndex);
Collection<MessageQueue> totalQueues = consumer.fetchMessageQueues(topic);
messageQueues =
RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
// If the job recovers from the state, the state has already contained the offsets of last
// commit.
if (restored) {
initOffsetTableFromRestoredOffsets(messageQueues);
} else {
initOffsets(messageQueues);
}
}
@Override
public void run(SourceContext context) throws Exception {
String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
String tag =
props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE);
timer.scheduleAtFixedRate(
() -> {
// context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
context.emitWatermark(waterMarkForAll.getCurrentWatermark());
},
5,
5,
TimeUnit.SECONDS);
if (StringUtils.isEmpty(sql)) {
consumer.subscribe(topic, tag);
} else {
// pull with sql do not support block pull.
consumer.subscribe(topic, MessageSelector.bySql(sql));
}
for (MessageQueue mq : messageQueues) {
this.executor.execute(
() ->
RetryUtil.call(
() -> {
while (runningChecker.isRunning()) {
try {
Long offset = offsetTable.get(mq);
consumer.setPullBatchSize(pullBatchSize);
consumer.seek(mq, offset);
boolean found = false;
List<MessageExt> messages =
consumer.poll(
getInteger(
props,
CONSUMER_TIMEOUT,
DEFAULT_CONSUMER_TIMEOUT));
if (CollectionUtils.isNotEmpty(messages)) {
long fetchTime = System.currentTimeMillis();
for (MessageExt msg : messages) {
byte[] key =
msg.getKeys() != null
? msg.getKeys()
.getBytes(
StandardCharsets
.UTF_8)
: null;
byte[] value = msg.getBody();
OUT data =
schema.deserializeKeyAndValue(
key, value);
// output and state update are atomic
synchronized (checkPointLock) {
log.debug(
msg.getMsgId()
+ "_"
+ msg.getBrokerName()
+ " "
+ msg.getQueueId()
+ " "
+ msg.getQueueOffset());
context.collectWithTimestamp(
data, msg.getBornTimestamp());
long emitTime =
System.currentTimeMillis();
// update max eventTime per queue
// waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp());
waterMarkForAll.extractTimestamp(
msg.getBornTimestamp());
tpsMetric.markEvent();
long eventTime =
msg.getStoreTimestamp();
fetchDelay.report(
Math.abs(
fetchTime - eventTime));
emitDelay.report(
Math.abs(emitTime - eventTime));
}
}
found = true;
}
synchronized (checkPointLock) {
updateMessageQueueOffset(
mq, consumer.committed(mq));
}
if (!found) {
RetryUtil.waitForMs(
RocketMQConfig
.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return true;
},
"RuntimeException",
runningChecker));
}
awaitTermination();
}
private void awaitTermination() throws InterruptedException {
while (runningChecker.isRunning()) {
Thread.sleep(50);
}
}
/**
* only flink job start with no state can init offsets from broker
*
* @param messageQueues
* @throws MQClientException
*/
private void initOffsets(List<MessageQueue> messageQueues) throws MQClientException {
for (MessageQueue mq : messageQueues) {
long offset;
switch (startMode) {
case LATEST:
consumer.seekToEnd(mq);
offset = consumer.committed(mq);
break;
case EARLIEST:
consumer.seekToBegin(mq);
offset = consumer.committed(mq);
break;
case GROUP_OFFSETS:
offset = consumer.committed(mq);
// the min offset return if consumer group first join,return a negative number
// if
// catch exception when fetch from broker.
// If you want consumer from earliest,please use OffsetResetStrategy.EARLIEST
if (offset <= 0) {
switch (offsetResetStrategy) {
case LATEST:
consumer.seekToEnd(mq);
offset = consumer.committed(mq);
log.info(
"current consumer thread:{} has no committed offset,use Strategy:{} instead",
mq,
offsetResetStrategy);
break;
case EARLIEST:
log.info(
"current consumer thread:{} has no committed offset,use Strategy:{} instead",
mq,
offsetResetStrategy);
consumer.seekToBegin(mq);
offset = consumer.committed(mq);
break;
default:
break;
}
}
break;
case TIMESTAMP:
offset = consumer.offsetForTimestamp(mq, specificTimeStamp);
break;
case SPECIFIC_OFFSETS:
if (specificStartupOffsets == null) {
throw new RuntimeException(
"StartMode is specific_offsets.But none offsets has been specified");
}
Long specificOffset = specificStartupOffsets.get(mq);
if (specificOffset != null) {
offset = specificOffset;
} else {
offset = consumer.committed(mq);
}
break;
default:
throw new IllegalArgumentException(
"current startMode is not supported" + startMode);
}
log.info(
"current consumer queue:{} start from offset of: {}",
mq.getBrokerName() + "-" + mq.getQueueId(),
offset);
offsetTable.put(mq, offset);
}
}
/** consume from the min offset at every restart with no state */
public RocketMQSourceFunction<OUT> setStartFromEarliest() {
this.startMode = StartupMode.EARLIEST;
return this;
}
/** consume from the max offset of each broker's queue at every restart with no state */
public RocketMQSourceFunction<OUT> setStartFromLatest() {
this.startMode = StartupMode.LATEST;
return this;
}
/** consume from the closest offset */
public RocketMQSourceFunction<OUT> setStartFromTimeStamp(long timeStamp) {
this.startMode = StartupMode.TIMESTAMP;
this.specificTimeStamp = timeStamp;
return this;
}
/** consume from the group offsets those was stored in brokers. */
public RocketMQSourceFunction<OUT> setStartFromGroupOffsets() {
this.startMode = StartupMode.GROUP_OFFSETS;
return this;
}
/**
* consume from the group offsets those was stored in brokers. If there is no committed
* offset,#{@link OffsetResetStrategy} would provide initialization policy.
*/
public RocketMQSourceFunction<OUT> setStartFromGroupOffsets(
OffsetResetStrategy offsetResetStrategy) {
this.startMode = StartupMode.GROUP_OFFSETS;
this.offsetResetStrategy = offsetResetStrategy;
return this;
}
/**
* consume from the specific offset. Group offsets is enable while the broker didn't specify
* offset.
*/
public RocketMQSourceFunction<OUT> setStartFromSpecificOffsets(
Map<MessageQueue, Long> specificOffsets) {
this.specificStartupOffsets = specificOffsets;
this.startMode = StartupMode.SPECIFIC_OFFSETS;
return this;
}
private void updateMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
offsetTable.put(mq, offset);
if (!enableCheckpoint) {
consumer.getOffsetStore().updateOffset(mq, offset, false);
}
}
@Override
public void cancel() {
log.debug("cancel ...");
runningChecker.setRunning(false);
if (timer != null) {
timer.shutdown();
timer = null;
}
if (executor != null) {
executor.shutdown();
executor = null;
}
if (consumer != null) {
consumer.shutdown();
consumer = null;
}
if (offsetTable != null) {
offsetTable.clear();
offsetTable = null;
}
if (restoredOffsets != null) {
restoredOffsets.clear();
restoredOffsets = null;
}
if (pendingOffsetsToCommit != null) {
pendingOffsetsToCommit.clear();
pendingOffsetsToCommit = null;
}
}
@Override
public void close() throws Exception {
log.debug("close ...");
// pretty much the same logic as cancelling
try {
cancel();
} finally {
super.close();
}
}
public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) {
Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
restoredOffsets.forEach(
(mq, offset) -> {
if (messageQueues.contains(mq)) {
offsetTable.put(mq, offset);
}
});
log.info("init offset table [{}] from restoredOffsets successful.", offsetTable);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// called when a snapshot for a checkpoint is requested
log.info("Snapshotting state {} ...", context.getCheckpointId());
if (!runningChecker.isRunning()) {
log.info("snapshotState() called on closed source; returning null.");
return;
}
Map<MessageQueue, Long> currentOffsets;
try {
// Discovers topic route change when snapshot
RetryUtil.call(
() -> {
Collection<MessageQueue> totalQueues = consumer.fetchMessageQueues(topic);
int taskNumber = getRuntimeContext().getNumberOfParallelSubtasks();
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
List<MessageQueue> newQueues =
RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex);
Collections.sort(newQueues);
log.debug(taskIndex + " Topic route is same.");
if (!messageQueues.equals(newQueues)) {
throw new RuntimeException();
}
return true;
},
"RuntimeException due to topic route changed");
unionOffsetStates.clear();
currentOffsets = new HashMap<>(offsetTable.size());
} catch (RuntimeException e) {
log.warn("Retry failed multiple times for topic route change, keep previous offset.");
// If the retry fails for multiple times, the message queue and its offset in the
// previous checkpoint will be retained.
List<Tuple2<MessageQueue, Long>> unionOffsets =
Lists.newArrayList(unionOffsetStates.get().iterator());
Map<MessageQueue, Long> queueOffsets = new HashMap<>(unionOffsets.size());
unionOffsets.forEach(queueOffset -> queueOffsets.put(queueOffset.f0, queueOffset.f1));
currentOffsets = new HashMap<>(unionOffsets.size() + offsetTable.size());
currentOffsets.putAll(queueOffsets);
}
for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
currentOffsets.put(entry.getKey(), entry.getValue());
}
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
log.info(
"Snapshot state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
offsetTable,
context.getCheckpointId(),
context.getCheckpointTimestamp());
}
/**
* called every time the user-defined function is initialized, be that when the function is
* first initialized or be that when the function is actually recovering from an earlier
* checkpoint. Given this, initializeState() is not only the place where different types of
* state are initialized, but also where state recovery logic is included.
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
log.info("initialize State ...");
this.unionOffsetStates =
context.getOperatorStateStore()
.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(
new TypeHint<Tuple2<MessageQueue, Long>>() {})));
this.restored = context.isRestored();
if (restored) {
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
if (!restoredOffsets.containsKey(mqOffsets.f0)
|| restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
}
}
log.info(
"Setting restore state in the consumer. Using the following offsets: {}",
restoredOffsets);
} else {
log.info("No restore state for the consumer.");
}
}
@Override
public TypeInformation<OUT> getProducedType() {
return schema.getProducedType();
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// callback when checkpoint complete
if (!runningChecker.isRunning()) {
log.info("notifyCheckpointComplete() called on closed source; returning null.");
return;
}
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
if (posInMap == -1) {
log.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
return;
}
Map<MessageQueue, Long> offsets =
(Map<MessageQueue, Long>) pendingOffsetsToCommit.remove(posInMap);
// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
pendingOffsetsToCommit.remove(0);
}
if (offsets == null || offsets.size() == 0) {
log.debug("Checkpoint state was empty.");
return;
}
for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
consumer.getOffsetStore().updateOffset(entry.getKey(), entry.getValue(), false);
consumer.getOffsetStore().persist(consumer.queueWithNamespace(entry.getKey()));
}
}
}