[#715] Support the RocketMQ TableSource based on the new Source interface (#716)
diff --git a/rocketmq-flink/pom.xml b/rocketmq-flink/pom.xml
index d5fc49f..abec905 100644
--- a/rocketmq-flink/pom.xml
+++ b/rocketmq-flink/pom.xml
@@ -34,7 +34,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<rocketmq.version>4.7.1</rocketmq.version>
- <flink.version>1.12.2</flink.version>
+ <flink.version>1.13.0</flink.version>
<commons-lang.version>2.5</commons-lang.version>
<scala.binary.version>2.11</scala.binary.version>
<spotless.version>2.4.2</spotless.version>
@@ -78,6 +78,12 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
@@ -102,6 +108,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-test</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
<dependency>
<groupId>commons-lang</groupId>
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
index b899618..79a8149 100644
--- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
@@ -24,7 +24,7 @@
import org.apache.rocketmq.flink.source.reader.RocketMQPartitionSplitReader;
import org.apache.rocketmq.flink.source.reader.RocketMQRecordEmitter;
import org.apache.rocketmq.flink.source.reader.RocketMQSourceReader;
-import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQRecordDeserializationSchema;
+import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema;
import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitSerializer;
@@ -52,10 +52,11 @@
public class RocketMQSource<OUT>
implements Source<OUT, RocketMQPartitionSplit, RocketMQSourceEnumState>,
ResultTypeQueryable<OUT> {
- private static final long serialVersionUID = -6755372893283732098L;
+ private static final long serialVersionUID = -1L;
private final String topic;
private final String consumerGroup;
+ private final String nameServerAddress;
private final String tag;
private final long stopInMs;
private final long startTime;
@@ -64,20 +65,22 @@
// Boundedness
private final Boundedness boundedness;
- private final RocketMQRecordDeserializationSchema<OUT> deserializationSchema;
+ private final RocketMQDeserializationSchema<OUT> deserializationSchema;
public RocketMQSource(
String topic,
String consumerGroup,
+ String nameServerAddress,
String tag,
long stopInMs,
long startTime,
long startOffset,
long partitionDiscoveryIntervalMs,
Boundedness boundedness,
- RocketMQRecordDeserializationSchema<OUT> deserializationSchema) {
+ RocketMQDeserializationSchema<OUT> deserializationSchema) {
this.topic = topic;
this.consumerGroup = consumerGroup;
+ this.nameServerAddress = nameServerAddress;
this.tag = tag;
this.stopInMs = stopInMs;
this.startTime = startTime;
@@ -93,8 +96,8 @@
}
@Override
- public SourceReader<OUT, RocketMQPartitionSplit> createReader(SourceReaderContext readerContext)
- throws Exception {
+ public SourceReader<OUT, RocketMQPartitionSplit> createReader(
+ SourceReaderContext readerContext) {
FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<OUT, Long, Long>>> elementsQueue =
new FutureCompletingBlockingQueue<>();
deserializationSchema.open(
@@ -115,6 +118,7 @@
new RocketMQPartitionSplitReader<>(
topic,
consumerGroup,
+ nameServerAddress,
tag,
stopInMs,
startTime,
@@ -136,6 +140,7 @@
return new RocketMQSourceEnumerator(
topic,
consumerGroup,
+ nameServerAddress,
stopInMs,
startOffset,
partitionDiscoveryIntervalMs,
@@ -150,6 +155,7 @@
return new RocketMQSourceEnumerator(
topic,
consumerGroup,
+ nameServerAddress,
stopInMs,
startOffset,
partitionDiscoveryIntervalMs,
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java
new file mode 100644
index 0000000..064e193
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.common;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Includes config options of RocketMQ connector type. */
+public class RocketMQOptions {
+
+ public static final ConfigOption<String> TOPIC = ConfigOptions.key("topic").noDefaultValue();
+
+ public static final ConfigOption<String> CONSUMER_GROUP =
+ ConfigOptions.key("consumerGroup").noDefaultValue();
+
+ public static final ConfigOption<String> NAME_SERVER_ADDRESS =
+ ConfigOptions.key("nameServerAddress").noDefaultValue();
+
+ public static final ConfigOption<String> OPTIONAL_TAG =
+ ConfigOptions.key("tag").noDefaultValue();
+
+ public static final ConfigOption<Integer> OPTIONAL_START_MESSAGE_OFFSET =
+ ConfigOptions.key("startMessageOffset").defaultValue(-1);
+
+ public static final ConfigOption<Long> OPTIONAL_START_TIME_MILLS =
+ ConfigOptions.key("startTimeMs".toLowerCase()).longType().defaultValue(-1L);
+
+ public static final ConfigOption<String> OPTIONAL_START_TIME =
+ ConfigOptions.key("startTime".toLowerCase()).stringType().noDefaultValue();
+
+ public static final ConfigOption<String> OPTIONAL_END_TIME =
+ ConfigOptions.key("endTime").noDefaultValue();
+
+ public static final ConfigOption<String> OPTIONAL_TIME_ZONE =
+ ConfigOptions.key("timeZone".toLowerCase()).stringType().noDefaultValue();
+
+ public static final ConfigOption<Long> OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS =
+ ConfigOptions.key("partitionDiscoveryIntervalMs").longType().defaultValue(30000L);
+
+ public static final ConfigOption<String> OPTIONAL_ENCODING =
+ ConfigOptions.key("encoding").stringType().defaultValue("UTF-8");
+
+ public static final ConfigOption<String> OPTIONAL_FIELD_DELIMITER =
+ ConfigOptions.key("fieldDelimiter").stringType().defaultValue("\u0001");
+
+ public static final ConfigOption<String> OPTIONAL_LINE_DELIMITER =
+ ConfigOptions.key("lineDelimiter").stringType().defaultValue("\n");
+
+ public static final ConfigOption<Boolean> OPTIONAL_COLUMN_ERROR_DEBUG =
+ ConfigOptions.key("columnErrorDebug").booleanType().defaultValue(true);
+
+ public static final ConfigOption<String> OPTIONAL_LENGTH_CHECK =
+ ConfigOptions.key("lengthCheck").stringType().defaultValue("NONE");
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
index 08290c6..61b563a 100644
--- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
@@ -19,7 +19,6 @@
package org.apache.rocketmq.flink.source.enumerator;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
@@ -38,6 +37,7 @@
import javax.annotation.Nullable;
+import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -59,6 +59,8 @@
private final String topic;
/** The consumer group used for this RocketMQSource. */
private final String consumerGroup;
+ /** The name server address used for this RocketMQSource. */
+ private final String nameServerAddress;
/** The stop timestamp for this RocketMQSource. */
private final long stopInMs;
/** The start offset for this RocketMQSource. */
@@ -85,12 +87,13 @@
private final Map<Integer, Set<RocketMQPartitionSplit>> pendingPartitionSplitAssignment;
// Lazily instantiated or mutable fields.
- private MQPullConsumer consumer;
+ private DefaultMQPullConsumer consumer;
private boolean noMoreNewPartitionSplits = false;
public RocketMQSourceEnumerator(
String topic,
String consumerGroup,
+ String nameServerAddress,
long stopInMs,
long startOffset,
long partitionDiscoveryIntervalMs,
@@ -99,6 +102,7 @@
this(
topic,
consumerGroup,
+ nameServerAddress,
stopInMs,
startOffset,
partitionDiscoveryIntervalMs,
@@ -110,6 +114,7 @@
public RocketMQSourceEnumerator(
String topic,
String consumerGroup,
+ String nameServerAddress,
long stopInMs,
long startOffset,
long partitionDiscoveryIntervalMs,
@@ -118,6 +123,7 @@
Map<Integer, List<RocketMQPartitionSplit>> currentSplitsAssignments) {
this.topic = topic;
this.consumerGroup = consumerGroup;
+ this.nameServerAddress = nameServerAddress;
this.stopInMs = stopInMs;
this.startOffset = startOffset;
this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
@@ -180,7 +186,7 @@
}
@Override
- public RocketMQSourceEnumState snapshotState() {
+ public RocketMQSourceEnumState snapshotState(long checkpointId) {
return new RocketMQSourceEnumState(readerIdToSplitAssignments);
}
@@ -298,6 +304,14 @@
private void initialRocketMQConsumer() {
try {
consumer = new DefaultMQPullConsumer(consumerGroup);
+ consumer.setNamesrvAddr(nameServerAddress);
+ consumer.setInstanceName(
+ String.join(
+ "||",
+ ManagementFactory.getRuntimeMXBean().getName(),
+ topic,
+ consumerGroup,
+ "" + System.nanoTime()));
consumer.start();
} catch (MQClientException e) {
LOG.error("Failed to initial RocketMQ consumer.", e);
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
index 3bbeec8..41fbbea 100644
--- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -19,13 +19,12 @@
package org.apache.rocketmq.flink.source.reader;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQRecordDeserializationSchema;
+import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema;
import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -43,6 +42,7 @@
import javax.annotation.Nullable;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -70,12 +70,12 @@
private final long startTime;
private final long startOffset;
- private final RocketMQRecordDeserializationSchema<T> deserializationSchema;
+ private final RocketMQDeserializationSchema<T> deserializationSchema;
private final Map<Tuple3<String, String, Integer>, Long> startingOffsets;
private final Map<Tuple3<String, String, Integer>, Long> stoppingTimestamps;
private final SimpleCollector<T> collector;
- private MQPullConsumer consumer;
+ private DefaultMQPullConsumer consumer;
private volatile boolean wakeup = false;
@@ -84,11 +84,12 @@
public RocketMQPartitionSplitReader(
String topic,
String consumerGroup,
+ String nameServerAddress,
String tag,
long stopInMs,
long startTime,
long startOffset,
- RocketMQRecordDeserializationSchema<T> deserializationSchema) {
+ RocketMQDeserializationSchema<T> deserializationSchema) {
this.topic = topic;
this.tag = tag;
this.stopInMs = stopInMs;
@@ -98,7 +99,7 @@
this.startingOffsets = new HashMap<>();
this.stoppingTimestamps = new HashMap<>();
this.collector = new SimpleCollector<>();
- initialRocketMQConsumer(consumerGroup);
+ initialRocketMQConsumer(consumerGroup, nameServerAddress);
}
@Override
@@ -280,9 +281,17 @@
// --------------- private helper method ----------------------
- private void initialRocketMQConsumer(String consumerGroup) {
+ private void initialRocketMQConsumer(String consumerGroup, String nameServerAddress) {
try {
consumer = new DefaultMQPullConsumer(consumerGroup);
+ consumer.setNamesrvAddr(nameServerAddress);
+ consumer.setInstanceName(
+ String.join(
+ "||",
+ ManagementFactory.getRuntimeMXBean().getName(),
+ topic,
+ consumerGroup,
+ "" + System.nanoTime()));
consumer.start();
} catch (MQClientException e) {
LOG.error("Failed to initial RocketMQ consumer.", e);
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/BytesMessage.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/BytesMessage.java
new file mode 100644
index 0000000..d109a7f
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/BytesMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader.deserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Message contains byte array. */
+public class BytesMessage {
+
+ private byte[] data;
+ private Map<String, String> properties = new HashMap<>();
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, String> props) {
+ this.properties = props;
+ }
+
+ public Object getProperty(String key) {
+ return properties.get(key);
+ }
+
+ public void setProperty(String key, String value) {
+ properties.put(key, value);
+ }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DeserializationSchema.java
similarity index 77%
copy from rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java
copy to rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DeserializationSchema.java
index 455f8af..3b087cc 100644
--- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DeserializationSchema.java
@@ -1,7 +1,5 @@
package org.apache.rocketmq.flink.source.reader.deserializer;
-import org.apache.rocketmq.common.message.MessageExt;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -9,11 +7,9 @@
import java.io.IOException;
import java.io.Serializable;
-import java.util.List;
-/** An interface for the deserialization of RocketMQ records. */
-public interface RocketMQRecordDeserializationSchema<T>
- extends Serializable, ResultTypeQueryable<T> {
+/** An interface for the deserialization of records. */
+public interface DeserializationSchema<IN, OUT> extends Serializable, ResultTypeQueryable<OUT> {
/**
* Initialization method for the schema. It is called before the actual working methods {@link
@@ -35,9 +31,9 @@
* records can be buffered in memory or collecting records might delay emitting checkpoint
* barrier.
*
- * @param record The MessageExts to deserialize.
+ * @param record The record to deserialize.
* @param out The collector to put the resulting messages.
*/
@PublicEvolving
- void deserialize(List<MessageExt> record, Collector<T> out) throws IOException;
+ void deserialize(IN record, Collector<OUT> out) throws IOException;
}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DirtyDataStrategy.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DirtyDataStrategy.java
new file mode 100644
index 0000000..06a0c2d
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DirtyDataStrategy.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader.deserializer;
+
+/** Dirty data process strategy. */
+public enum DirtyDataStrategy {
+ SKIP,
+ SKIP_SILENT,
+ CUT,
+ PAD,
+ NULL,
+ EXCEPTION
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
similarity index 83%
rename from rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java
rename to rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
index 455f8af..6358e4c 100644
--- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
@@ -4,16 +4,14 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;
import java.io.IOException;
-import java.io.Serializable;
import java.util.List;
/** An interface for the deserialization of RocketMQ records. */
-public interface RocketMQRecordDeserializationSchema<T>
- extends Serializable, ResultTypeQueryable<T> {
+public interface RocketMQDeserializationSchema<T>
+ extends DeserializationSchema<List<MessageExt>, T> {
/**
* Initialization method for the schema. It is called before the actual working methods {@link
@@ -25,7 +23,7 @@
* @param context Contextual information that can be used during initialization.
*/
@PublicEvolving
- default void open(InitializationContext context) throws Exception {}
+ default void open(InitializationContext context) {}
/**
* Deserializes the byte message.
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchema.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchema.java
new file mode 100644
index 0000000..5bd990e
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchema.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader.deserializer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.flink.source.reader.deserializer.RowDeserializationSchema.MetadataConverter;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A row data wrapper class that wraps a {@link RocketMQDeserializationSchema} to deserialize {@link
+ * MessageExt}.
+ */
+public class RocketMQRowDeserializationSchema implements RocketMQDeserializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final RowDeserializationSchema deserializationSchema;
+
+ private transient List<BytesMessage> bytesMessages = new ArrayList<>(1);
+
+ public RocketMQRowDeserializationSchema(
+ TableSchema tableSchema,
+ Map<String, String> properties,
+ boolean hasMetadata,
+ MetadataConverter[] metadataConverters) {
+ deserializationSchema =
+ new RowDeserializationSchema.Builder()
+ .setProperties(properties)
+ .setTableSchema(tableSchema)
+ .setHasMetadata(hasMetadata)
+ .setMetadataConverters(metadataConverters)
+ .build();
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ deserializationSchema.open(context);
+ bytesMessages = new ArrayList<>();
+ }
+
+ @Override
+ public void deserialize(List<MessageExt> input, Collector<RowData> collector) {
+ extractMessages(input);
+ deserializationSchema.deserialize(bytesMessages, collector);
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+
+ private void extractMessages(List<MessageExt> messages) {
+ bytesMessages = new ArrayList<>(messages.size());
+ for (MessageExt message : messages) {
+ BytesMessage bytesMessage = new BytesMessage();
+ bytesMessage.setData(message.getBody());
+ if (message.getProperties() != null) {
+ bytesMessage.setProperties(message.getProperties());
+ }
+ bytesMessage.setProperty("__topic__", message.getTopic());
+ bytesMessage.setProperty(
+ "__store_timestamp__", String.valueOf(message.getStoreTimestamp()));
+ bytesMessage.setProperty(
+ "__born_timestamp__", String.valueOf(message.getBornTimestamp()));
+ bytesMessage.setProperty("__queue_id__", String.valueOf(message.getQueueId()));
+ bytesMessage.setProperty("__queue_offset__", String.valueOf(message.getQueueOffset()));
+ bytesMessage.setProperty("__msg_id__", message.getMsgId());
+ bytesMessage.setProperty("__keys__", message.getKeys());
+ bytesMessage.setProperty("__tags__", message.getTags());
+ bytesMessages.add(bytesMessage);
+ }
+ }
+
+ @VisibleForTesting
+ public List<BytesMessage> getBytesMessages() {
+ return bytesMessages;
+ }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
new file mode 100644
index 0000000..f106693
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
@@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader.deserializer;
+
+import org.apache.rocketmq.flink.source.util.ByteSerializer;
+import org.apache.rocketmq.flink.source.util.ByteSerializer.ValueType;
+import org.apache.rocketmq.flink.source.util.StringSerializer;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The row based implementation of {@link DeserializationSchema} for the deserialization of records.
+ */
+public class RowDeserializationSchema
+ implements DeserializationSchema<List<BytesMessage>, RowData> {
+
+ private static final long serialVersionUID = -1L;
+ private static final Logger logger = LoggerFactory.getLogger(RowDeserializationSchema.class);
+
+ private transient TableSchema tableSchema;
+ private final DirtyDataStrategy formatErrorStrategy;
+ private final DirtyDataStrategy fieldMissingStrategy;
+ private final DirtyDataStrategy fieldIncrementStrategy;
+ private final String encoding;
+ private final String fieldDelimiter;
+ private final String lineDelimiter;
+ private final boolean columnErrorDebug;
+ private final MetadataCollector metadataCollector;
+ private final int totalColumnSize;
+ private final int dataColumnSize;
+ private final ValueType[] fieldTypes;
+ private transient DataType[] fieldDataTypes;
+ private final Set<String> headerFields;
+ private final Map<String, String> properties;
+ private final Map<String, Integer> columnIndexMapping;
+ private final Map<Integer, Integer> dataIndexMapping;
+ private long lastLogExceptionTime;
+ private long lastLogHandleFieldTime;
+
+ private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000;
+
+ public RowDeserializationSchema(
+ TableSchema tableSchema,
+ DirtyDataStrategy formatErrorStrategy,
+ DirtyDataStrategy fieldMissingStrategy,
+ DirtyDataStrategy fieldIncrementStrategy,
+ String encoding,
+ String fieldDelimiter,
+ String lineDelimiter,
+ boolean columnErrorDebug,
+ boolean hasMetadata,
+ MetadataConverter[] metadataConverters,
+ List<String> headerFields,
+ Map<String, String> properties) {
+ this.tableSchema = tableSchema;
+ this.formatErrorStrategy = formatErrorStrategy;
+ this.fieldMissingStrategy = fieldMissingStrategy;
+ this.fieldIncrementStrategy = fieldIncrementStrategy;
+ this.columnErrorDebug = columnErrorDebug;
+ this.encoding = encoding;
+ this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
+ this.lineDelimiter = StringEscapeUtils.unescapeJava(lineDelimiter);
+ this.metadataCollector = new MetadataCollector(hasMetadata, metadataConverters);
+ this.headerFields = headerFields == null ? null : new HashSet<>(headerFields);
+ this.properties = properties;
+ this.totalColumnSize = tableSchema.getFieldNames().length;
+ int dataColumnSize = 0;
+ this.fieldTypes = new ValueType[totalColumnSize];
+ this.columnIndexMapping = new HashMap<>();
+ this.dataIndexMapping = new HashMap<>();
+ for (int index = 0; index < tableSchema.getFieldNames().length; index++) {
+ this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
+ }
+ for (int index = 0; index < totalColumnSize; index++) {
+ ValueType type =
+ ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass());
+ this.fieldTypes[index] = type;
+ if (!isHeaderField(index)) {
+ dataIndexMapping.put(dataColumnSize, index);
+ dataColumnSize++;
+ }
+ }
+ this.dataColumnSize = dataColumnSize;
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ DescriptorProperties descriptorProperties = new DescriptorProperties();
+ descriptorProperties.putProperties(properties);
+ this.tableSchema = SchemaValidator.deriveTableSinkSchema(descriptorProperties);
+ this.fieldDataTypes = tableSchema.getFieldDataTypes();
+ this.lastLogExceptionTime = System.currentTimeMillis();
+ this.lastLogHandleFieldTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public void deserialize(List<BytesMessage> messages, Collector<RowData> collector) {
+ metadataCollector.collector = collector;
+ deserialize(messages, metadataCollector);
+ }
+
+ private void deserialize(List<BytesMessage> messages, MetadataCollector collector) {
+ if (null == messages || messages.size() == 0) {
+ return;
+ }
+ for (BytesMessage message : messages) {
+ collector.message = message;
+ if (isOnlyHaveVarbinaryDataField()) {
+ GenericRowData rowData = new GenericRowData(totalColumnSize);
+ int dataIndex = dataIndexMapping.get(0);
+ rowData.setField(dataIndex, message.getData());
+ for (int index = 0; index < totalColumnSize; index++) {
+ if (index == dataIndex) {
+ continue;
+ }
+ String headerValue = getHeaderValue(message, index);
+ rowData.setField(
+ index,
+ StringSerializer.deserialize(
+ headerValue,
+ fieldTypes[index],
+ fieldDataTypes[index],
+ new HashSet<>()));
+ }
+ collector.collect(rowData);
+ } else if (isAllHeaderField()) {
+ GenericRowData rowData = new GenericRowData(totalColumnSize);
+ for (int index = 0; index < totalColumnSize; index++) {
+ String headerValue = getHeaderValue(message, index);
+ rowData.setField(
+ index,
+ StringSerializer.deserialize(
+ headerValue,
+ fieldTypes[index],
+ fieldDataTypes[index],
+ new HashSet<>()));
+ }
+ collector.collect(rowData);
+ } else {
+ if (message.getData() == null) {
+ logger.info("Deserialize empty BytesMessage body, ignore the empty message.");
+ return;
+ }
+ deserializeBytesMessage(message, collector);
+ }
+ }
+ }
+
+ private boolean isOnlyHaveVarbinaryDataField() {
+ if (dataColumnSize == 1 && dataIndexMapping.size() == 1) {
+ int index = dataIndexMapping.get(0);
+ return isByteArrayType(tableSchema.getFieldNames()[index]);
+ }
+ return false;
+ }
+
+ private boolean isAllHeaderField() {
+ return null != headerFields && headerFields.size() == tableSchema.getFieldNames().length;
+ }
+
+ private void deserializeBytesMessage(BytesMessage message, Collector<RowData> collector) {
+ String body;
+ try {
+ body = new String(message.getData(), encoding);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ String[] lines = StringUtils.split(body, lineDelimiter);
+ for (String line : lines) {
+ String[] data = StringUtils.splitPreserveAllTokens(line, fieldDelimiter);
+ if (dataColumnSize == 1) {
+ data = new String[1];
+ data[0] = line;
+ }
+ if (data.length < dataColumnSize) {
+ data = handleFieldMissing(data);
+ } else if (data.length > dataColumnSize) {
+ data = handleFieldIncrement(data);
+ }
+ if (data == null) {
+ continue;
+ }
+ GenericRowData rowData = new GenericRowData(totalColumnSize);
+ boolean skip = false;
+ for (int index = 0; index < totalColumnSize; index++) {
+ try {
+ String fieldValue = getValue(message, data, line, index);
+ rowData.setField(
+ index,
+ StringSerializer.deserialize(
+ fieldValue,
+ fieldTypes[index],
+ fieldDataTypes[index],
+ new HashSet<>()));
+ } catch (Exception e) {
+ skip = handleException(rowData, index, data, e);
+ }
+ }
+ if (skip) {
+ continue;
+ }
+ collector.collect(rowData);
+ }
+ }
+
+ private boolean isHeaderField(int index) {
+ return headerFields != null && headerFields.contains(tableSchema.getFieldNames()[index]);
+ }
+
+ private String getHeaderValue(BytesMessage message, int index) {
+ Object object = message.getProperty(tableSchema.getFieldNames()[index]);
+ return object == null ? "" : (String) object;
+ }
+
+ private String getValue(BytesMessage message, String[] data, String line, int index) {
+ String fieldValue = null;
+ if (isHeaderField(index)) {
+ fieldValue = getHeaderValue(message, index);
+ } else {
+ if (dataColumnSize == 1) {
+ fieldValue = line;
+ } else {
+ if (index < data.length) {
+ fieldValue = data[index];
+ }
+ }
+ }
+
+ return fieldValue;
+ }
+
+ private boolean isByteArrayType(String fieldName) {
+ TypeInformation<?> typeInformation =
+ tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
+ if (typeInformation != null) {
+ ValueType valueType = ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
+ return valueType == ValueType.V_ByteArray;
+ }
+ return false;
+ }
+
+ private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
+ boolean skip = false;
+ switch (formatErrorStrategy) {
+ case SKIP:
+ long now = System.currentTimeMillis();
+ if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
+ logger.warn(
+ "Data format error, field type: "
+ + fieldTypes[index]
+ + "field data: "
+ + data[index]
+ + ", index: "
+ + index
+ + ", data: ["
+ + StringUtils.join(data, ",")
+ + "]",
+ e);
+ lastLogExceptionTime = now;
+ }
+ skip = true;
+ break;
+ case SKIP_SILENT:
+ skip = true;
+ break;
+ default:
+ case CUT:
+ case NULL:
+ case PAD:
+ row.setField(index, null);
+ break;
+ case EXCEPTION:
+ throw new RuntimeException(e);
+ }
+
+ return skip;
+ }
+
+ private String[] handleFieldMissing(String[] data) {
+ switch (fieldMissingStrategy) {
+ default:
+ case SKIP:
+ long now = System.currentTimeMillis();
+ if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+ logger.warn(
+ "Field missing error, table column number: "
+ + totalColumnSize
+ + ", data column number: "
+ + dataColumnSize
+ + ", data field number: "
+ + data.length
+ + ", data: ["
+ + StringUtils.join(data, ",")
+ + "]");
+ lastLogHandleFieldTime = now;
+ }
+ return null;
+ case SKIP_SILENT:
+ return null;
+ case CUT:
+ case NULL:
+ case PAD:
+ {
+ String[] res = new String[totalColumnSize];
+ for (int i = 0; i < data.length; ++i) {
+ Object dataIndex = dataIndexMapping.get(i);
+ if (dataIndex != null) {
+ res[(int) dataIndex] = data[i];
+ }
+ }
+ return res;
+ }
+ case EXCEPTION:
+ throw new RuntimeException();
+ }
+ }
+
+ private String[] handleFieldIncrement(String[] data) {
+ switch (fieldIncrementStrategy) {
+ case SKIP:
+ long now = System.currentTimeMillis();
+ if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+ logger.warn(
+ "Field increment error, table column number: "
+ + totalColumnSize
+ + ", data column number: "
+ + dataColumnSize
+ + ", data field number: "
+ + data.length
+ + ", data: ["
+ + StringUtils.join(data, ",")
+ + "]");
+ lastLogHandleFieldTime = now;
+ }
+ return null;
+ case SKIP_SILENT:
+ return null;
+ default:
+ case CUT:
+ case NULL:
+ case PAD:
+ {
+ String[] res = new String[totalColumnSize];
+ for (int i = 0; i < dataColumnSize; ++i) {
+ Object dataIndex = dataIndexMapping.get(i);
+ if (dataIndex != null) {
+ res[(int) dataIndex] = data[i];
+ }
+ }
+ return res;
+ }
+ case EXCEPTION:
+ throw new RuntimeException();
+ }
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType());
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /** Source metadata converter interface. */
+ public interface MetadataConverter extends Serializable {
+ Object read(BytesMessage message);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /** Metadata of RowData collector. */
+ public static final class MetadataCollector implements Collector<RowData>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean hasMetadata;
+ private final MetadataConverter[] metadataConverters;
+
+ public transient BytesMessage message;
+ public transient Collector<RowData> collector;
+
+ public MetadataCollector(boolean hasMetadata, MetadataConverter[] metadataConverters) {
+ this.hasMetadata = hasMetadata;
+ this.metadataConverters = metadataConverters;
+ }
+
+ @Override
+ public void collect(RowData physicalRow) {
+ if (hasMetadata) {
+ final int physicalArity = physicalRow.getArity();
+ final int metadataArity = metadataConverters.length;
+ final GenericRowData producedRow =
+ new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
+ final GenericRowData genericPhysicalRow = (GenericRowData) physicalRow;
+ for (int index = 0; index < physicalArity; index++) {
+ producedRow.setField(index, genericPhysicalRow.getField(index));
+ }
+ for (int index = 0; index < metadataArity; index++) {
+ producedRow.setField(
+ index + physicalArity, metadataConverters[index].read(message));
+ }
+ collector.collect(producedRow);
+ } else {
+ collector.collect(physicalRow);
+ }
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+ }
+
+ /** Builder of {@link RowDeserializationSchema}. */
+ public static class Builder {
+
+ private TableSchema schema;
+ private DirtyDataStrategy formatErrorStrategy = DirtyDataStrategy.SKIP;
+ private DirtyDataStrategy fieldMissingStrategy = DirtyDataStrategy.SKIP;
+ private DirtyDataStrategy fieldIncrementStrategy = DirtyDataStrategy.CUT;
+ private String encoding = "UTF-8";
+ private String lineDelimiter = "\n";
+ private String fieldDelimiter = "\u0001";
+ private boolean columnErrorDebug = false;
+ private boolean hasMetadata;
+ private MetadataConverter[] metadataConverters;
+ private List<String> headerFields;
+ private Map<String, String> properties;
+
+ public Builder() {}
+
+ public Builder setTableSchema(TableSchema tableSchema) {
+ this.schema = tableSchema;
+ return this;
+ }
+
+ public Builder setFormatErrorStrategy(DirtyDataStrategy formatErrorStrategy) {
+ this.formatErrorStrategy = formatErrorStrategy;
+ return this;
+ }
+
+ public Builder setFieldMissingStrategy(DirtyDataStrategy fieldMissingStrategy) {
+ this.fieldMissingStrategy = fieldMissingStrategy;
+ return this;
+ }
+
+ public Builder setFieldIncrementStrategy(DirtyDataStrategy fieldIncrementStrategy) {
+ this.fieldIncrementStrategy = fieldIncrementStrategy;
+ return this;
+ }
+
+ public Builder setEncoding(String encoding) {
+ this.encoding = encoding;
+ return this;
+ }
+
+ public Builder setFieldDelimiter(String fieldDelimiter) {
+ this.fieldDelimiter = fieldDelimiter;
+ return this;
+ }
+
+ public Builder setLineDelimiter(String lineDelimiter) {
+ this.lineDelimiter = lineDelimiter;
+ return this;
+ }
+
+ public Builder setColumnErrorDebug(boolean columnErrorDebug) {
+ this.columnErrorDebug = columnErrorDebug;
+ return this;
+ }
+
+ public Builder setHasMetadata(boolean hasMetadata) {
+ this.hasMetadata = hasMetadata;
+ return this;
+ }
+
+ public Builder setMetadataConverters(MetadataConverter[] metadataConverters) {
+ this.metadataConverters = metadataConverters;
+ return this;
+ }
+
+ public Builder setHeaderFields(List<String> headerFields) {
+ this.headerFields = headerFields;
+ return this;
+ }
+
+ public Builder setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ if (null == properties) {
+ return this;
+ }
+ Configuration configuration = new Configuration();
+ for (String key : properties.keySet()) {
+ configuration.setString(key, properties.get(key));
+ }
+ String lengthCheck = configuration.get(CollectorOption.LENGTH_CHECK);
+ switch (lengthCheck.toUpperCase()) {
+ case "SKIP":
+ {
+ this.setFormatErrorStrategy(DirtyDataStrategy.SKIP);
+ this.setFieldMissingStrategy(DirtyDataStrategy.SKIP);
+ this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP);
+ }
+ break;
+ case "PAD":
+ {
+ this.setFormatErrorStrategy(DirtyDataStrategy.SKIP);
+ this.setFieldMissingStrategy(DirtyDataStrategy.PAD);
+ this.setFieldIncrementStrategy(DirtyDataStrategy.CUT);
+ }
+ break;
+ case "EXCEPTION":
+ {
+ this.setFormatErrorStrategy(DirtyDataStrategy.EXCEPTION);
+ this.setFieldMissingStrategy(DirtyDataStrategy.EXCEPTION);
+ this.setFieldIncrementStrategy(DirtyDataStrategy.EXCEPTION);
+ }
+ break;
+ case "SKIP_SILENT":
+ {
+ this.setFormatErrorStrategy(DirtyDataStrategy.SKIP_SILENT);
+ this.setFieldMissingStrategy(DirtyDataStrategy.SKIP_SILENT);
+ this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP_SILENT);
+ }
+ break;
+ default:
+ }
+ this.setEncoding(configuration.getString(CollectorOption.ENCODING));
+ this.setFieldDelimiter(configuration.getString(CollectorOption.FIELD_DELIMITER));
+ this.setLineDelimiter(configuration.getString(CollectorOption.LINE_DELIMITER));
+ this.setColumnErrorDebug(configuration.getBoolean(CollectorOption.COLUMN_ERROR_DEBUG));
+ return this;
+ }
+
+ public RowDeserializationSchema build() {
+ return new RowDeserializationSchema(
+ schema,
+ formatErrorStrategy,
+ fieldMissingStrategy,
+ fieldIncrementStrategy,
+ encoding,
+ fieldDelimiter,
+ lineDelimiter,
+ columnErrorDebug,
+ hasMetadata,
+ metadataConverters,
+ headerFields,
+ properties);
+ }
+ }
+
+ /** Options for {@link RowDeserializationSchema}. */
+ public static class CollectorOption {
+ public static final ConfigOption<String> ENCODING =
+ ConfigOptions.key("encoding".toLowerCase()).defaultValue("UTF-8");
+ public static final ConfigOption<String> FIELD_DELIMITER =
+ ConfigOptions.key("fieldDelimiter".toLowerCase()).defaultValue("\u0001");
+ public static final ConfigOption<String> LINE_DELIMITER =
+ ConfigOptions.key("lineDelimiter".toLowerCase()).defaultValue("\n");
+ public static final ConfigOption<Boolean> COLUMN_ERROR_DEBUG =
+ ConfigOptions.key("columnErrorDebug".toLowerCase()).defaultValue(true);
+ public static final ConfigOption<String> LENGTH_CHECK =
+ ConfigOptions.key("lengthCheck".toLowerCase()).defaultValue("NONE");
+ }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
new file mode 100644
index 0000000..ec41fc6
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.CONSUMER_GROUP;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.NAME_SERVER_ADDRESS;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_COLUMN_ERROR_DEBUG;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_ENCODING;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_END_TIME;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_FIELD_DELIMITER;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_LENGTH_CHECK;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_LINE_DELIMITER;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_TIME;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_TIME_MILLS;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TAG;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TIME_ZONE;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.TOPIC;
+
+/**
+ * Defines the {@link DynamicTableSourceFactory} implementation to create {@link
+ * RocketMQScanTableSource}.
+ */
+public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFactory {
+
+ private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+ @Override
+ public String factoryIdentifier() {
+ return "rocketmq";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+ requiredOptions.add(TOPIC);
+ requiredOptions.add(CONSUMER_GROUP);
+ requiredOptions.add(NAME_SERVER_ADDRESS);
+ return requiredOptions;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> optionalOptions = new HashSet<>();
+ optionalOptions.add(OPTIONAL_TAG);
+ optionalOptions.add(OPTIONAL_START_MESSAGE_OFFSET);
+ optionalOptions.add(OPTIONAL_START_TIME_MILLS);
+ optionalOptions.add(OPTIONAL_START_TIME);
+ optionalOptions.add(OPTIONAL_END_TIME);
+ optionalOptions.add(OPTIONAL_TIME_ZONE);
+ optionalOptions.add(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS);
+ optionalOptions.add(OPTIONAL_ENCODING);
+ optionalOptions.add(OPTIONAL_FIELD_DELIMITER);
+ optionalOptions.add(OPTIONAL_LINE_DELIMITER);
+ optionalOptions.add(OPTIONAL_COLUMN_ERROR_DEBUG);
+ optionalOptions.add(OPTIONAL_LENGTH_CHECK);
+ return optionalOptions;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ transformContext(this, context);
+ FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
+ helper.validate();
+ Map<String, String> rawProperties = context.getCatalogTable().getOptions();
+ Configuration configuration = Configuration.fromMap(rawProperties);
+ String topic = configuration.getString(TOPIC);
+ String consumerGroup = configuration.getString(CONSUMER_GROUP);
+ String nameServerAddress = configuration.getString(NAME_SERVER_ADDRESS);
+ String tag = configuration.getString(OPTIONAL_TAG);
+ int startMessageOffset = configuration.getInteger(OPTIONAL_START_MESSAGE_OFFSET);
+ long startTimeMs = configuration.getLong(OPTIONAL_START_TIME_MILLS);
+ String startDateTime = configuration.getString(OPTIONAL_START_TIME);
+ String timeZone = configuration.getString(OPTIONAL_TIME_ZONE);
+ long startTime = startTimeMs;
+ if (startTime == -1) {
+ if (!StringUtils.isNullOrWhitespaceOnly(startDateTime)) {
+ try {
+ startTime = parseDateString(startDateTime, timeZone);
+ } catch (ParseException e) {
+ throw new RuntimeException(
+ String.format(
+ "Incorrect datetime format: %s, pls use ISO-8601 "
+ + "complete date plus hours, minutes and seconds format:%s.",
+ startDateTime, DATE_FORMAT),
+ e);
+ }
+ }
+ }
+ long stopInMs = Long.MAX_VALUE;
+ String endDateTime = configuration.getString(OPTIONAL_END_TIME);
+ if (!StringUtils.isNullOrWhitespaceOnly(endDateTime)) {
+ try {
+ stopInMs = parseDateString(endDateTime, timeZone);
+ } catch (ParseException e) {
+ throw new RuntimeException(
+ String.format(
+ "Incorrect datetime format: %s, pls use ISO-8601 "
+ + "complete date plus hours, minutes and seconds format:%s.",
+ endDateTime, DATE_FORMAT),
+ e);
+ }
+ Preconditions.checkArgument(
+ stopInMs >= startTime, "Start time should be less than stop time.");
+ }
+ long partitionDiscoveryIntervalMs =
+ configuration.getLong(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS);
+ DescriptorProperties descriptorProperties = new DescriptorProperties();
+ descriptorProperties.putProperties(rawProperties);
+ TableSchema physicalSchema =
+ TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+ descriptorProperties.putTableSchema("schema", physicalSchema);
+ return new RocketMQScanTableSource(
+ descriptorProperties,
+ physicalSchema,
+ topic,
+ consumerGroup,
+ nameServerAddress,
+ tag,
+ stopInMs,
+ startMessageOffset,
+ startMessageOffset < 0 ? startTime : -1L,
+ partitionDiscoveryIntervalMs);
+ }
+
+ private void transformContext(
+ DynamicTableFactory factory, DynamicTableFactory.Context context) {
+ Map<String, String> catalogOptions = context.getCatalogTable().getOptions();
+ Map<String, String> convertedOptions =
+ normalizeOptionCaseAsFactory(factory, catalogOptions);
+ catalogOptions.clear();
+ for (Map.Entry<String, String> entry : convertedOptions.entrySet()) {
+ catalogOptions.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private Map<String, String> normalizeOptionCaseAsFactory(
+ Factory factory, Map<String, String> options) {
+ Map<String, String> normalizedOptions = new HashMap<>();
+ Map<String, String> requiredOptionKeysLowerCaseToOriginal =
+ factory.requiredOptions().stream()
+ .collect(
+ Collectors.toMap(
+ option -> option.key().toLowerCase(), ConfigOption::key));
+ Map<String, String> optionalOptionKeysLowerCaseToOriginal =
+ factory.optionalOptions().stream()
+ .collect(
+ Collectors.toMap(
+ option -> option.key().toLowerCase(), ConfigOption::key));
+ for (Map.Entry<String, String> entry : options.entrySet()) {
+ final String catalogOptionKey = entry.getKey();
+ final String catalogOptionValue = entry.getValue();
+ normalizedOptions.put(
+ requiredOptionKeysLowerCaseToOriginal.containsKey(
+ catalogOptionKey.toLowerCase())
+ ? requiredOptionKeysLowerCaseToOriginal.get(
+ catalogOptionKey.toLowerCase())
+ : optionalOptionKeysLowerCaseToOriginal.getOrDefault(
+ catalogOptionKey.toLowerCase(), catalogOptionKey),
+ catalogOptionValue);
+ }
+ return normalizedOptions;
+ }
+
+ private Long parseDateString(String dateString, String timeZone) throws ParseException {
+ FastDateFormat simpleDateFormat =
+ FastDateFormat.getInstance(DATE_FORMAT, TimeZone.getTimeZone(timeZone));
+ return simpleDateFormat.parse(dateString).getTime();
+ }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
new file mode 100644
index 0000000..37ab6a5
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.table;
+
+import org.apache.rocketmq.flink.source.RocketMQSource;
+import org.apache.rocketmq.flink.source.reader.deserializer.BytesMessage;
+import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema;
+import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQRowDeserializationSchema;
+import org.apache.rocketmq.flink.source.reader.deserializer.RowDeserializationSchema.MetadataConverter;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.flink.api.connector.source.Boundedness.BOUNDED;
+import static org.apache.flink.api.connector.source.Boundedness.CONTINUOUS_UNBOUNDED;
+
+/** Defines the scan table source of RocketMQ. */
+public class RocketMQScanTableSource implements ScanTableSource, SupportsReadingMetadata {
+
+ private final DescriptorProperties properties;
+ private final TableSchema schema;
+
+ private final String topic;
+ private final String consumerGroup;
+ private final String nameServerAddress;
+ private final String tag;
+
+ private final long stopInMs;
+ private final long partitionDiscoveryIntervalMs;
+ private final long startMessageOffset;
+ private final long startTime;
+
+ private List<String> metadataKeys;
+
+ public RocketMQScanTableSource(
+ DescriptorProperties properties,
+ TableSchema schema,
+ String topic,
+ String consumerGroup,
+ String nameServerAddress,
+ String tag,
+ long stopInMs,
+ long startMessageOffset,
+ long startTime,
+ long partitionDiscoveryIntervalMs) {
+ this.properties = properties;
+ this.schema = schema;
+ this.topic = topic;
+ this.consumerGroup = consumerGroup;
+ this.nameServerAddress = nameServerAddress;
+ this.tag = tag;
+ this.stopInMs = stopInMs;
+ this.startMessageOffset = startMessageOffset;
+ this.startTime = startTime;
+ this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
+ this.metadataKeys = Collections.emptyList();
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ return SourceProvider.of(
+ new RocketMQSource<>(
+ topic,
+ consumerGroup,
+ nameServerAddress,
+ tag,
+ stopInMs,
+ startTime,
+ startMessageOffset < 0 ? 0 : startMessageOffset,
+ partitionDiscoveryIntervalMs,
+ isBounded() ? BOUNDED : CONTINUOUS_UNBOUNDED,
+ createDeserializationSchema()));
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+ this.metadataKeys = metadataKeys;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ RocketMQScanTableSource tableSource =
+ new RocketMQScanTableSource(
+ properties,
+ schema,
+ topic,
+ consumerGroup,
+ nameServerAddress,
+ tag,
+ stopInMs,
+ startMessageOffset,
+ startTime,
+ partitionDiscoveryIntervalMs);
+ tableSource.metadataKeys = metadataKeys;
+ return tableSource;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "RocketMQScanTableSource";
+ }
+
+ private RocketMQDeserializationSchema<RowData> createDeserializationSchema() {
+ final MetadataConverter[] metadataConverters =
+ metadataKeys.stream()
+ .map(
+ k ->
+ Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(m -> m.converter)
+ .toArray(MetadataConverter[]::new);
+ return new RocketMQRowDeserializationSchema(
+ schema, properties.asMap(), metadataKeys.size() > 0, metadataConverters);
+ }
+
+ private boolean isBounded() {
+ return stopInMs != Long.MAX_VALUE;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Metadata handling
+ // --------------------------------------------------------------------------------------------
+
+ enum ReadableMetadata {
+ TOPIC(
+ "topic",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(BytesMessage message) {
+ return StringData.fromString(
+ String.valueOf(message.getProperty("__topic__")));
+ }
+ });
+
+ final String key;
+
+ final DataType dataType;
+
+ final MetadataConverter converter;
+
+ ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+ }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/ByteSerializer.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/ByteSerializer.java
new file mode 100644
index 0000000..358cb84
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/ByteSerializer.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.util;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.Period;
+
+/** BytesSerializer is responsible to deserialize field from byte array. */
+public class ByteSerializer {
+
+ public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+
+ public static Object deserialize(byte[] value, ValueType type) {
+ return deserialize(value, type, DEFAULT_CHARSET);
+ }
+
+ public static Object deserialize(byte[] value, ValueType type, Charset charset) {
+ switch (type) {
+ case V_String:
+ return null == value ? "" : new String(value, charset);
+ case V_Timestamp: // sql.Timestamp encoded as long
+ return new Timestamp(ByteUtils.toLong(value));
+ case V_Date: // sql.Date encoded as long
+ return new Date(ByteUtils.toLong(value));
+ case V_Time: // sql.Time encoded as long
+ return new Time(ByteUtils.toLong(value));
+ case V_BigDecimal:
+ return ByteUtils.toBigDecimal(value);
+ default:
+ return commonDeserialize(value, type);
+ }
+ }
+
+ private static Object commonDeserialize(byte[] value, ValueType type) {
+ switch (type) {
+ case V_ByteArray: // byte[]
+ return value;
+ case V_Byte: // byte
+ return null == value ? (byte) '\0' : value[0];
+ case V_Short:
+ return ByteUtils.toShort(value);
+ case V_Integer:
+ return ByteUtils.toInt(value);
+ case V_Long:
+ return ByteUtils.toLong(value);
+ case V_Float:
+ return ByteUtils.toFloat(value);
+ case V_Double:
+ return ByteUtils.toDouble(value);
+ case V_Boolean:
+ return ByteUtils.toBoolean(value);
+ case V_BigInteger:
+ return new BigInteger(value);
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ public static ValueType getTypeIndex(Class<?> clazz) {
+ if (byte[].class.equals(clazz)) {
+ return ValueType.V_ByteArray;
+ } else if (String.class.equals(clazz)) {
+ return ValueType.V_String;
+ } else if (Byte.class.equals(clazz)) {
+ return ValueType.V_Byte;
+ } else if (Short.class.equals(clazz)) {
+ return ValueType.V_Short;
+ } else if (Integer.class.equals(clazz)) {
+ return ValueType.V_Integer;
+ } else if (Long.class.equals(clazz)) {
+ return ValueType.V_Long;
+ } else if (Float.class.equals(clazz)) {
+ return ValueType.V_Float;
+ } else if (Double.class.equals(clazz)) {
+ return ValueType.V_Double;
+ } else if (Boolean.class.equals(clazz)) {
+ return ValueType.V_Boolean;
+ } else if (Timestamp.class.equals(clazz)) {
+ return ValueType.V_Timestamp;
+ } else if (Date.class.equals(clazz)) {
+ return ValueType.V_Date;
+ } else if (Time.class.equals(clazz)) {
+ return ValueType.V_Time;
+ } else if (BigDecimal.class.equals(clazz)) {
+ return ValueType.V_BigDecimal;
+ } else if (BigInteger.class.equals(clazz)) {
+ return ValueType.V_BigInteger;
+ } else if (LocalDateTime.class.equals(clazz)) {
+ return ValueType.V_LocalDateTime;
+ } else if (LocalDate.class.equals(clazz)) {
+ return ValueType.V_LocalDate;
+ } else if (Duration.class.equals(clazz)) {
+ return ValueType.V_Duration;
+ } else if (LocalTime.class.equals(clazz)) {
+ return ValueType.V_LocalTime;
+ } else if (Period.class.equals(clazz)) {
+ return ValueType.V_Period;
+ } else if (OffsetDateTime.class.equals(clazz)) {
+ return ValueType.V_OffsetDateTime;
+ } else {
+ return ValueType.Unsupported;
+ }
+ }
+
+ /** Value Type. */
+ public enum ValueType {
+ V_ByteArray,
+ V_String,
+ V_Byte,
+ V_Short,
+ V_Integer,
+ V_Long,
+ V_Float,
+ V_Double,
+ V_Boolean,
+ V_Timestamp,
+ V_Date,
+ V_Time,
+ V_BigDecimal,
+ V_BigInteger,
+ V_LocalDateTime,
+ V_LocalDate,
+ V_Duration,
+ V_LocalTime,
+ V_Period,
+ V_OffsetDateTime,
+ Unsupported
+ }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/ByteUtils.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/ByteUtils.java
new file mode 100644
index 0000000..6e223a3
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/ByteUtils.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.util;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/** Utility class to for operations related to bytes. */
+public class ByteUtils {
+
+ /**
+ * Converts a byte array to an int value.
+ *
+ * @param bytes byte array
+ * @return the int value
+ */
+ public static int toInt(byte[] bytes) {
+ return toInt(bytes, 0);
+ }
+
+ /**
+ * Converts a byte array to an int value.
+ *
+ * @param bytes byte array
+ * @param offset offset into array
+ * @return the int value
+ * @throws IllegalArgumentException if there's not enough room in the array at the offset
+ * indicated.
+ */
+ public static int toInt(byte[] bytes, int offset) {
+ if (offset + Integer.BYTES > bytes.length) {
+ throw explainWrongLengthOrOffset(bytes, offset, Integer.BYTES, Integer.BYTES);
+ }
+ int n = 0;
+ for (int i = offset; i < (offset + Integer.BYTES); i++) {
+ n <<= 8;
+ n ^= bytes[i] & 0xFF;
+ }
+ return n;
+ }
+
+ /**
+ * Convert a byte array to a boolean.
+ *
+ * @param b array
+ * @return True or false.
+ */
+ public static boolean toBoolean(final byte[] b) {
+ return toBoolean(b, 0);
+ }
+
+ /**
+ * Convert a byte array to a boolean.
+ *
+ * @param b array
+ * @param offset offset into array
+ * @return True or false.
+ */
+ public static boolean toBoolean(final byte[] b, final int offset) {
+ if (offset + 1 > b.length) {
+ throw explainWrongLengthOrOffset(b, offset, 1, 1);
+ }
+ return b[offset] != (byte) 0;
+ }
+
+ /**
+ * Converts a byte array to a long value.
+ *
+ * @param bytes array
+ * @return the long value
+ */
+ public static long toLong(byte[] bytes) {
+ return toLong(bytes, 0);
+ }
+
+ /**
+ * Converts a byte array to a long value.
+ *
+ * @param bytes array of bytes
+ * @param offset offset into array
+ * @return the long value
+ * @throws IllegalArgumentException if there's not enough room in the array at the offset
+ * indicated.
+ */
+ public static long toLong(byte[] bytes, int offset) {
+ if (offset + Long.BYTES > bytes.length) {
+ throw explainWrongLengthOrOffset(bytes, offset, Long.BYTES, Long.BYTES);
+ }
+ long l = 0;
+ for (int i = offset; i < offset + Long.BYTES; i++) {
+ l <<= 8;
+ l ^= bytes[i] & 0xFF;
+ }
+ return l;
+ }
+
+ /**
+ * Presumes float encoded as IEEE 754 floating-point "single format".
+ *
+ * @param bytes byte array
+ * @return Float made from passed byte array.
+ */
+ public static float toFloat(byte[] bytes) {
+ return toFloat(bytes, 0);
+ }
+
+ /**
+ * Presumes float encoded as IEEE 754 floating-point "single format".
+ *
+ * @param bytes array to convert
+ * @param offset offset into array
+ * @return Float made from passed byte array.
+ */
+ public static float toFloat(byte[] bytes, int offset) {
+ return Float.intBitsToFloat(toInt(bytes, offset));
+ }
+
+ /**
+ * Parse a byte array to double.
+ *
+ * @param bytes byte array
+ * @return Return double made from passed bytes.
+ */
+ public static double toDouble(final byte[] bytes) {
+ return toDouble(bytes, 0);
+ }
+
+ /**
+ * Parse a byte array to double.
+ *
+ * @param bytes byte array
+ * @param offset offset where double is
+ * @return Return double made from passed bytes.
+ */
+ public static double toDouble(final byte[] bytes, final int offset) {
+ return Double.longBitsToDouble(toLong(bytes, offset));
+ }
+
+ /**
+ * Converts a byte array to a short value.
+ *
+ * @param bytes byte array
+ * @return the short value
+ */
+ public static short toShort(byte[] bytes) {
+ return toShort(bytes, 0);
+ }
+
+ /**
+ * Converts a byte array to a short value.
+ *
+ * @param bytes byte array
+ * @param offset offset into array
+ * @return the short value
+ * @throws IllegalArgumentException if there's not enough room in the array at the offset
+ * indicated.
+ */
+ public static short toShort(byte[] bytes, int offset) {
+ if (offset + Short.BYTES > bytes.length) {
+ throw explainWrongLengthOrOffset(bytes, offset, Short.BYTES, Short.BYTES);
+ }
+ short n = 0;
+ n ^= bytes[offset] & 0xFF;
+ n <<= 8;
+ n ^= bytes[offset + 1] & 0xFF;
+ return n;
+ }
+
+ // ---------------------------------------------------------------------------------------------------------
+
+ private static IllegalArgumentException explainWrongLengthOrOffset(
+ final byte[] bytes, final int offset, final int length, final int expectedLength) {
+ String exceptionMessage;
+ if (length != expectedLength) {
+ exceptionMessage = "Wrong length: " + length + ", expected " + expectedLength;
+ } else {
+ exceptionMessage =
+ "offset ("
+ + offset
+ + ") + length ("
+ + length
+ + ") exceed the"
+ + " capacity of the array: "
+ + bytes.length;
+ }
+ return new IllegalArgumentException(exceptionMessage);
+ }
+
+ public static BigDecimal toBigDecimal(byte[] bytes) {
+ return toBigDecimal(bytes, 0, bytes.length);
+ }
+
+ public static BigDecimal toBigDecimal(byte[] bytes, int offset, int length) {
+ if (bytes != null && length >= 5 && offset + length <= bytes.length) {
+ int scale = toInt(bytes, offset);
+ byte[] tcBytes = new byte[length - 4];
+ System.arraycopy(bytes, offset + 4, tcBytes, 0, length - 4);
+ return new BigDecimal(new BigInteger(tcBytes), scale);
+ } else {
+ return null;
+ }
+ }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java
new file mode 100644
index 0000000..b468ac9
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.util;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.TimestampConverter;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import sun.misc.BASE64Decoder;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Set;
+
+/** String serializer. */
+public class StringSerializer {
+
+ public static TimestampConverter timestampConverter = new TimestampConverter(3);
+ private static final BASE64Decoder decoder = new BASE64Decoder();
+
+ public static Object deserialize(
+ String value,
+ ByteSerializer.ValueType type,
+ DataType dataType,
+ Set<String> nullValues) {
+ return deserialize(value, type, dataType, nullValues, false);
+ }
+
+ public static Object deserialize(
+ String value,
+ ByteSerializer.ValueType type,
+ DataType dataType,
+ Set<String> nullValues,
+ Boolean isRGData) {
+ if (null != nullValues && nullValues.contains(value)) {
+ return null;
+ }
+ switch (type) {
+ case V_ByteArray: // byte[]
+ if (isRGData) {
+ byte[] bytes = null;
+ try {
+ bytes = decoder.decodeBuffer(value);
+ } catch (Exception e) {
+ //
+ }
+ return bytes;
+ } else {
+ return value.getBytes();
+ }
+ case V_String:
+ return BinaryStringData.fromString(value);
+ case V_Byte: // byte
+ return null == value ? null : Byte.parseByte(value);
+ case V_Short:
+ return null == value ? null : Short.parseShort(value);
+ case V_Integer:
+ return null == value ? null : Integer.parseInt(value);
+ case V_Long:
+ return null == value ? null : Long.parseLong(value);
+ case V_Float:
+ return null == value ? null : Float.parseFloat(value);
+ case V_Double:
+ return null == value ? null : Double.parseDouble(value);
+ case V_Boolean:
+ return null == value ? null : parseBoolean(value);
+ case V_Timestamp: // sql.Timestamp encoded as long
+ if (isRGData) {
+ return null == value ? null : Long.parseLong(value);
+ }
+ if (null == value) {
+ return null;
+ } else {
+ try {
+ return timestampConverter.toInternal(new Timestamp(Long.parseLong(value)));
+ } catch (NumberFormatException e) {
+ return timestampConverter.toInternal(Timestamp.valueOf(value));
+ }
+ }
+ case V_Date: // sql.Date encoded as long
+ if (isRGData) {
+ return null == value ? null : Long.parseLong(value);
+ }
+ return null == value
+ ? null
+ : DataFormatConverters.DateConverter.INSTANCE.toInternal(
+ Date.valueOf(value));
+ case V_Time: // sql.Time encoded as long
+ if (isRGData) {
+ return null == value ? null : Long.parseLong(value);
+ }
+ return null == value
+ ? null
+ : DataFormatConverters.TimeConverter.INSTANCE.toInternal(
+ new Time(Long.parseLong(value)));
+ case V_BigDecimal:
+ DecimalType decimalType = (DecimalType) dataType.getLogicalType();
+ return value == null
+ ? null
+ : DecimalData.fromBigDecimal(
+ new BigDecimal(value),
+ decimalType.getPrecision(),
+ decimalType.getScale());
+ case V_BigInteger:
+ return null == value ? null : new BigInteger(value);
+
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ public static Object deserialize(
+ String value, ByteSerializer.ValueType type, DataType dataType, Boolean isRGData) {
+ return deserialize(value, type, dataType, null, isRGData);
+ }
+
+ public static Boolean parseBoolean(String s) {
+ if (s != null) {
+ if (s.equalsIgnoreCase("true") || s.equalsIgnoreCase("false")) {
+ return Boolean.valueOf(s);
+ }
+
+ if (s.equals("1")) {
+ return Boolean.TRUE;
+ }
+
+ if (s.equals("0")) {
+ return Boolean.FALSE;
+ }
+ }
+
+ throw new IllegalArgumentException();
+ }
+}
diff --git a/rocketmq-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/rocketmq-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..32de8b2
--- /dev/null
+++ b/rocketmq-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.rocketmq.flink.source.table.RocketMQDynamicTableSourceFactory
\ No newline at end of file
diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchemaTest.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchemaTest.java
new file mode 100644
index 0000000..a904b04
--- /dev/null
+++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchemaTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader.deserializer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+import org.powermock.reflect.Whitebox;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+
+/** Test for {@link RocketMQRowDeserializationSchema}. */
+public class RocketMQRowDeserializationSchemaTest {
+
+ @Test
+ public void testDeserialize() {
+ TableSchema tableSchema =
+ new TableSchema.Builder()
+ .field("int", DataTypes.INT())
+ .field("varchar", DataTypes.VARCHAR(100))
+ .field("bool", DataTypes.BOOLEAN())
+ .field("char", DataTypes.CHAR(5))
+ .field("tinyint", DataTypes.TINYINT())
+ .field("decimal", DataTypes.DECIMAL(10, 5))
+ .field("smallint", DataTypes.SMALLINT())
+ .field("bigint", DataTypes.BIGINT())
+ .field("float", DataTypes.FLOAT())
+ .field("double", DataTypes.DOUBLE())
+ .field("date", DataTypes.DATE())
+ .field("time", DataTypes.TIME())
+ .field("timestamp", DataTypes.TIMESTAMP())
+ .build();
+ RocketMQRowDeserializationSchema recordDeserializer =
+ new RocketMQRowDeserializationSchema(tableSchema, new HashMap<>(), false, null);
+ RowDeserializationSchema sourceDeserializer = mock(RowDeserializationSchema.class);
+ InitializationContext initializationContext = mock(InitializationContext.class);
+ doNothing().when(sourceDeserializer).open(initializationContext);
+ Whitebox.setInternalState(recordDeserializer, "deserializationSchema", sourceDeserializer);
+ recordDeserializer.open(initializationContext);
+ MessageExt firstMsg =
+ new MessageExt(
+ 1,
+ System.currentTimeMillis(),
+ InetSocketAddress.createUnresolved("localhost", 8080),
+ System.currentTimeMillis(),
+ InetSocketAddress.createUnresolved("localhost", 8088),
+ "184019387");
+ firstMsg.setBody("test_deserializer_raw_messages_1".getBytes());
+ MessageExt secondMsg =
+ new MessageExt(
+ 1,
+ System.currentTimeMillis(),
+ InetSocketAddress.createUnresolved("localhost", 8081),
+ System.currentTimeMillis(),
+ InetSocketAddress.createUnresolved("localhost", 8087),
+ "284019387");
+ secondMsg.setBody("test_deserializer_raw_messages_2".getBytes());
+ MessageExt thirdMsg =
+ new MessageExt(
+ 1,
+ System.currentTimeMillis(),
+ InetSocketAddress.createUnresolved("localhost", 8082),
+ System.currentTimeMillis(),
+ InetSocketAddress.createUnresolved("localhost", 8086),
+ "384019387");
+ thirdMsg.setBody("test_deserializer_raw_messages_3".getBytes());
+ List<MessageExt> messages = Arrays.asList(firstMsg, secondMsg, thirdMsg);
+ Collector<RowData> collector = mock(Collector.class);
+ recordDeserializer.deserialize(messages, collector);
+
+ assertEquals(3, recordDeserializer.getBytesMessages().size());
+ assertEquals(firstMsg.getBody(), recordDeserializer.getBytesMessages().get(0).getData());
+ assertEquals(
+ String.valueOf(firstMsg.getStoreTimestamp()),
+ recordDeserializer.getBytesMessages().get(0).getProperty("__store_timestamp__"));
+ assertEquals(
+ String.valueOf(firstMsg.getBornTimestamp()),
+ recordDeserializer.getBytesMessages().get(0).getProperty("__born_timestamp__"));
+ assertEquals(
+ String.valueOf(firstMsg.getQueueId()),
+ recordDeserializer.getBytesMessages().get(0).getProperty("__queue_id__"));
+ assertEquals(
+ String.valueOf(firstMsg.getQueueOffset()),
+ recordDeserializer.getBytesMessages().get(0).getProperty("__queue_offset__"));
+ assertEquals(secondMsg.getBody(), recordDeserializer.getBytesMessages().get(1).getData());
+ assertEquals(
+ String.valueOf(secondMsg.getStoreTimestamp()),
+ recordDeserializer.getBytesMessages().get(1).getProperty("__store_timestamp__"));
+ assertEquals(
+ String.valueOf(secondMsg.getBornTimestamp()),
+ recordDeserializer.getBytesMessages().get(1).getProperty("__born_timestamp__"));
+ assertEquals(
+ String.valueOf(secondMsg.getQueueId()),
+ recordDeserializer.getBytesMessages().get(1).getProperty("__queue_id__"));
+ assertEquals(
+ String.valueOf(secondMsg.getQueueOffset()),
+ recordDeserializer.getBytesMessages().get(1).getProperty("__queue_offset__"));
+ assertEquals(thirdMsg.getBody(), recordDeserializer.getBytesMessages().get(2).getData());
+ assertEquals(
+ String.valueOf(thirdMsg.getStoreTimestamp()),
+ recordDeserializer.getBytesMessages().get(2).getProperty("__store_timestamp__"));
+ assertEquals(
+ String.valueOf(thirdMsg.getBornTimestamp()),
+ recordDeserializer.getBytesMessages().get(2).getProperty("__born_timestamp__"));
+ assertEquals(
+ String.valueOf(thirdMsg.getQueueId()),
+ recordDeserializer.getBytesMessages().get(2).getProperty("__queue_id__"));
+ assertEquals(
+ String.valueOf(thirdMsg.getQueueOffset()),
+ recordDeserializer.getBytesMessages().get(2).getProperty("__queue_offset__"));
+ }
+}