blob: 8d98d2e1e8d7146125a103dd8126b448d8ff27db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.source;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumState;
import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumStateSerializer;
import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator;
import org.apache.rocketmq.flink.source.reader.RocketMQPartitionSplitReader;
import org.apache.rocketmq.flink.source.reader.RocketMQRecordEmitter;
import org.apache.rocketmq.flink.source.reader.RocketMQSourceReader;
import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema;
import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitSerializer;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.commons.lang.Validate;
import org.apache.commons.lang3.StringUtils;
import java.util.function.Supplier;
/** The Source implementation of RocketMQ. */
public class RocketMQSource<OUT>
implements Source<OUT, RocketMQPartitionSplit, RocketMQSourceEnumState>,
ResultTypeQueryable<OUT> {
private static final long serialVersionUID = -1L;
private final String consumerOffsetMode;
private final long consumerOffsetTimestamp;
private final long pollTime;
private final String topic;
private final String consumerGroup;
private final String nameServerAddress;
private final String tag;
private final String sql;
private final String accessKey;
private final String secretKey;
private final long stopInMs;
private final long startTime;
private final long startOffset;
private final long partitionDiscoveryIntervalMs;
// Boundedness
private final Boundedness boundedness;
private final RocketMQDeserializationSchema<OUT> deserializationSchema;
public RocketMQSource(
long pollTime,
String topic,
String consumerGroup,
String nameServerAddress,
String accessKey,
String secretKey,
String tag,
String sql,
long stopInMs,
long startTime,
long startOffset,
long partitionDiscoveryIntervalMs,
Boundedness boundedness,
RocketMQDeserializationSchema<OUT> deserializationSchema,
String cosumerOffsetMode,
long consumerOffsetTimestamp) {
Validate.isTrue(
!(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
"Consumer tag and sql can not set value at the same time");
this.pollTime = pollTime;
this.topic = topic;
this.consumerGroup = consumerGroup;
this.nameServerAddress = nameServerAddress;
this.accessKey = accessKey;
this.secretKey = secretKey;
this.tag = StringUtils.isEmpty(tag) ? RocketMQConfig.DEFAULT_CONSUMER_TAG : tag;
this.sql = sql;
this.stopInMs = stopInMs;
this.startTime = startTime;
this.startOffset = startOffset > 0 ? startOffset : startTime;
this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
this.boundedness = boundedness;
this.deserializationSchema = deserializationSchema;
this.consumerOffsetMode = cosumerOffsetMode;
this.consumerOffsetTimestamp = consumerOffsetTimestamp;
}
@Override
public Boundedness getBoundedness() {
return this.boundedness;
}
@Override
public SourceReader<OUT, RocketMQPartitionSplit> createReader(
SourceReaderContext readerContext) {
FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<OUT, Long, Long>>> elementsQueue =
new FutureCompletingBlockingQueue<>();
deserializationSchema.open(
new DeserializationSchema.InitializationContext() {
@Override
public MetricGroup getMetricGroup() {
return readerContext.metricGroup();
}
@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return null;
}
});
Supplier<SplitReader<Tuple3<OUT, Long, Long>, RocketMQPartitionSplit>> splitReaderSupplier =
() ->
new RocketMQPartitionSplitReader<>(
pollTime,
topic,
consumerGroup,
nameServerAddress,
accessKey,
secretKey,
tag,
sql,
stopInMs,
startTime,
startOffset,
deserializationSchema);
RocketMQRecordEmitter<OUT> recordEmitter = new RocketMQRecordEmitter<>();
return new RocketMQSourceReader<>(
elementsQueue,
splitReaderSupplier,
recordEmitter,
new Configuration(),
readerContext);
}
@Override
public SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> createEnumerator(
SplitEnumeratorContext<RocketMQPartitionSplit> enumContext) {
return new RocketMQSourceEnumerator(
topic,
consumerGroup,
nameServerAddress,
accessKey,
secretKey,
stopInMs,
startOffset,
partitionDiscoveryIntervalMs,
boundedness,
enumContext,
consumerOffsetMode,
consumerOffsetTimestamp);
}
@Override
public SplitEnumerator<RocketMQPartitionSplit, RocketMQSourceEnumState> restoreEnumerator(
SplitEnumeratorContext<RocketMQPartitionSplit> enumContext,
RocketMQSourceEnumState checkpoint) {
return new RocketMQSourceEnumerator(
topic,
consumerGroup,
nameServerAddress,
accessKey,
secretKey,
stopInMs,
startOffset,
partitionDiscoveryIntervalMs,
boundedness,
enumContext,
checkpoint.getCurrentAssignment(),
consumerOffsetMode,
consumerOffsetTimestamp);
}
@Override
public SimpleVersionedSerializer<RocketMQPartitionSplit> getSplitSerializer() {
return new RocketMQPartitionSplitSerializer();
}
@Override
public SimpleVersionedSerializer<RocketMQSourceEnumState> getEnumeratorCheckpointSerializer() {
return new RocketMQSourceEnumStateSerializer();
}
@Override
public TypeInformation<OUT> getProducedType() {
return deserializationSchema.getProducedType();
}
}