blob: 8b338e3b219eb1eaace2159a4d39d1d1da80050c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema;
import org.apache.rocketmq.flink.legacy.common.util.MetricUtils;
import org.apache.rocketmq.flink.legacy.common.util.RetryUtil;
import org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils;
import org.apache.rocketmq.flink.legacy.common.watermark.WaterMarkForAll;
import org.apache.rocketmq.flink.legacy.common.watermark.WaterMarkPerQueue;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.curator4.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang.Validate;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_BATCH_SIZE;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_EARLIEST;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE;
import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getInteger;
import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getLong;
/**
* The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability
* guarantees when checkpoints are enabled. Otherwise, the source doesn't provide any reliability
* guarantees.
*/
public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT>
implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(RocketMQSourceFunction.class);
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
private RunningChecker runningChecker;
private transient DefaultMQPullConsumer consumer;
private KeyValueDeserializationSchema<OUT> schema;
private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
private Map<MessageQueue, Long> offsetTable;
private Map<MessageQueue, Long> restoredOffsets;
private List<MessageQueue> messageQueues;
private ExecutorService executor;
// watermark in source
private WaterMarkPerQueue waterMarkPerQueue;
private WaterMarkForAll waterMarkForAll;
private ScheduledExecutorService timer;
/** Data for pending but uncommitted offsets. */
private LinkedMap pendingOffsetsToCommit;
private Properties props;
private String topic;
private String group;
private transient volatile boolean restored;
private transient boolean enableCheckpoint;
private volatile Object checkPointLock;
private Meter tpsMetric;
public RocketMQSourceFunction(KeyValueDeserializationSchema<OUT> schema, Properties props) {
this.schema = schema;
this.props = props;
}
@Override
public void open(Configuration parameters) throws Exception {
log.debug("source open....");
Validate.notEmpty(props, "Consumer properties can not be empty");
this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
Validate.notEmpty(topic, "Consumer topic can not be empty");
Validate.notEmpty(group, "Consumer group can not be empty");
String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG);
String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
Validate.isTrue(
StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql),
"Consumer tag and sql can not set value at the same time");
this.enableCheckpoint =
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
if (offsetTable == null) {
offsetTable = new ConcurrentHashMap<>();
}
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
// use restoredOffsets to init offset table.
initOffsetTableFromRestoredOffsets();
if (pendingOffsetsToCommit == null) {
pendingOffsetsToCommit = new LinkedMap();
}
if (checkPointLock == null) {
checkPointLock = new ReentrantLock();
}
if (waterMarkPerQueue == null) {
waterMarkPerQueue = new WaterMarkPerQueue(5000);
}
if (waterMarkForAll == null) {
waterMarkForAll = new WaterMarkForAll(5000);
}
if (timer == null) {
timer = Executors.newSingleThreadScheduledExecutor();
}
runningChecker = new RunningChecker();
runningChecker.setRunning(true);
final ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("rmq-pull-thread-%d")
.build();
executor = Executors.newCachedThreadPool(threadFactory);
int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
consumer = new DefaultMQPullConsumer(group, RocketMQConfig.buildAclRPCHook(props));
RocketMQConfig.buildConsumerConfigs(props, consumer);
// set unique instance name, avoid exception:
// https://help.aliyun.com/document_detail/29646.html
String runtimeName = ManagementFactory.getRuntimeMXBean().getName();
String instanceName =
RocketMQUtils.getInstanceName(
runtimeName,
topic,
group,
String.valueOf(indexOfThisSubTask),
String.valueOf(System.nanoTime()));
consumer.setInstanceName(instanceName);
consumer.start();
Counter outputCounter =
getRuntimeContext()
.getMetricGroup()
.counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter());
tpsMetric =
getRuntimeContext()
.getMetricGroup()
.meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60));
}
@Override
public void run(SourceContext context) throws Exception {
String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
String tag =
props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE);
final RuntimeContext ctx = getRuntimeContext();
// The lock that guarantees that record emission and state updates are atomic,
// from the view of taking a checkpoint.
int taskNumber = ctx.getNumberOfParallelSubtasks();
int taskIndex = ctx.getIndexOfThisSubtask();
log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", taskNumber, taskIndex);
timer.scheduleAtFixedRate(
() -> {
// context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
context.emitWatermark(waterMarkForAll.getCurrentWatermark());
},
5,
5,
TimeUnit.SECONDS);
Collection<MessageQueue> totalQueues = consumer.fetchSubscribeMessageQueues(topic);
messageQueues =
RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
for (MessageQueue mq : messageQueues) {
this.executor.execute(
() ->
RetryUtil.call(
() -> {
while (runningChecker.isRunning()) {
try {
long offset = getMessageQueueOffset(mq);
PullResult pullResult = null;
if (StringUtils.isEmpty(sql)) {
pullResult =
consumer.pullBlockIfNotFound(
mq, tag, offset, pullBatchSize);
} else {
// pull with sql do not support block pull.
pullResult =
consumer.pull(
mq,
MessageSelector.bySql(sql),
offset,
pullBatchSize);
}
boolean found = false;
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> messages =
pullResult.getMsgFoundList();
for (MessageExt msg : messages) {
byte[] key =
msg.getKeys() != null
? msg.getKeys()
.getBytes(
StandardCharsets
.UTF_8)
: null;
byte[] value = msg.getBody();
OUT data =
schema.deserializeKeyAndValue(
key, value);
// output and state update are atomic
synchronized (checkPointLock) {
log.debug(
msg.getMsgId()
+ "_"
+ msg
.getBrokerName()
+ " "
+ msg.getQueueId()
+ " "
+ msg
.getQueueOffset());
context.collectWithTimestamp(
data,
msg.getBornTimestamp());
// update max eventTime per queue
// waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp());
waterMarkForAll.extractTimestamp(
msg.getBornTimestamp());
tpsMetric.markEvent();
}
}
found = true;
break;
case NO_MATCHED_MSG:
log.debug(
"No matched message after offset {} for queue {}",
offset,
mq);
break;
case NO_NEW_MSG:
log.debug(
"No new message after offset {} for queue {}",
offset,
mq);
break;
case OFFSET_ILLEGAL:
log.warn(
"Offset {} is illegal for queue {}",
offset,
mq);
break;
default:
break;
}
synchronized (checkPointLock) {
updateMessageQueueOffset(
mq, pullResult.getNextBeginOffset());
}
if (!found) {
RetryUtil.waitForMs(
RocketMQConfig
.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return true;
},
"RuntimeException"));
}
awaitTermination();
}
private void awaitTermination() throws InterruptedException {
while (runningChecker.isRunning()) {
Thread.sleep(50);
}
}
private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {
Long offset = offsetTable.get(mq);
// restoredOffsets(unionOffsetStates) is the restored global union state;
// should only snapshot mqs that actually belong to us
if (offset == null) {
// fetchConsumeOffset from broker
offset = consumer.fetchConsumeOffset(mq, false);
if (!restored || offset < 0) {
String initialOffset =
props.getProperty(
RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
switch (initialOffset) {
case CONSUMER_OFFSET_EARLIEST:
offset = consumer.minOffset(mq);
break;
case CONSUMER_OFFSET_LATEST:
offset = consumer.maxOffset(mq);
break;
case CONSUMER_OFFSET_TIMESTAMP:
offset =
consumer.searchOffset(
mq,
getLong(
props,
RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP,
System.currentTimeMillis()));
break;
default:
throw new IllegalArgumentException(
"Unknown value for CONSUMER_OFFSET_RESET_TO.");
}
}
}
offsetTable.put(mq, offset);
return offsetTable.get(mq);
}
private void updateMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
offsetTable.put(mq, offset);
if (!enableCheckpoint) {
consumer.updateConsumeOffset(mq, offset);
}
}
@Override
public void cancel() {
log.debug("cancel ...");
runningChecker.setRunning(false);
if (timer != null) {
timer.shutdown();
timer = null;
}
if (executor != null) {
executor.shutdown();
executor = null;
}
if (consumer != null) {
consumer.shutdown();
consumer = null;
}
if (offsetTable != null) {
offsetTable.clear();
offsetTable = null;
}
if (restoredOffsets != null) {
restoredOffsets.clear();
restoredOffsets = null;
}
if (pendingOffsetsToCommit != null) {
pendingOffsetsToCommit.clear();
pendingOffsetsToCommit = null;
}
}
@Override
public void close() throws Exception {
log.debug("close ...");
// pretty much the same logic as cancelling
try {
cancel();
} finally {
super.close();
}
}
public void initOffsetTableFromRestoredOffsets() {
Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
restoredOffsets.forEach(
(mq, offset) -> {
if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < offset) {
offsetTable.put(mq, offset);
}
});
log.info("init offset table from restoredOffsets successful.", offsetTable);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// called when a snapshot for a checkpoint is requested
log.info("Snapshotting state {} ...", context.getCheckpointId());
if (!runningChecker.isRunning()) {
log.info("snapshotState() called on closed source; returning null.");
return;
}
// Discovery topic Route change when snapshot
RetryUtil.call(
() -> {
Collection<MessageQueue> totalQueues =
consumer.fetchSubscribeMessageQueues(topic);
int taskNumber = getRuntimeContext().getNumberOfParallelSubtasks();
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
List<MessageQueue> newQueues =
RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex);
Collections.sort(newQueues);
log.debug(taskIndex + " Topic route is same.");
if (!messageQueues.equals(newQueues)) {
throw new RuntimeException();
}
return true;
},
"RuntimeException due to topic route changed");
unionOffsetStates.clear();
HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());
for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
currentOffsets.put(entry.getKey(), entry.getValue());
}
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
log.info(
"Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
offsetTable,
context.getCheckpointId(),
context.getCheckpointTimestamp());
}
/**
* called every time the user-defined function is initialized, be that when the function is
* first initialized or be that when the function is actually recovering from an earlier
* checkpoint. Given this, initializeState() is not only the place where different types of
* state are initialized, but also where state recovery logic is included.
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
log.info("initialize State ...");
this.unionOffsetStates =
context.getOperatorStateStore()
.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(
new TypeHint<Tuple2<MessageQueue, Long>>() {})));
this.restored = context.isRestored();
if (restored) {
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
if (!restoredOffsets.containsKey(mqOffsets.f0)
|| restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
}
}
log.info(
"Setting restore state in the consumer. Using the following offsets: {}",
restoredOffsets);
} else {
log.info("No restore state for the consumer.");
}
}
@Override
public TypeInformation getProducedType() {
return schema.getProducedType();
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// callback when checkpoint complete
if (!runningChecker.isRunning()) {
log.info("notifyCheckpointComplete() called on closed source; returning null.");
return;
}
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
if (posInMap == -1) {
log.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
return;
}
Map<MessageQueue, Long> offsets =
(Map<MessageQueue, Long>) pendingOffsetsToCommit.remove(posInMap);
// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
pendingOffsetsToCommit.remove(0);
}
if (offsets == null || offsets.size() == 0) {
log.debug("Checkpoint state was empty.");
return;
}
for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
}
}
}