[#715] Support the RocketMQ TableSource based on the new Source interface (#716)

diff --git a/rocketmq-flink/pom.xml b/rocketmq-flink/pom.xml
index d5fc49f..abec905 100644
--- a/rocketmq-flink/pom.xml
+++ b/rocketmq-flink/pom.xml
@@ -34,7 +34,7 @@
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <rocketmq.version>4.7.1</rocketmq.version>
-        <flink.version>1.12.2</flink.version>
+        <flink.version>1.13.0</flink.version>
         <commons-lang.version>2.5</commons-lang.version>
         <scala.binary.version>2.11</scala.binary.version>
         <spotless.version>2.4.2</spotless.version>
@@ -78,6 +78,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
             <artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId>
             <version>${flink.version}</version>
         </dependency>
@@ -102,6 +108,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-test</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>commons-lang</groupId>
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
index b899618..79a8149 100644
--- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
@@ -24,7 +24,7 @@
 import org.apache.rocketmq.flink.source.reader.RocketMQPartitionSplitReader;
 import org.apache.rocketmq.flink.source.reader.RocketMQRecordEmitter;
 import org.apache.rocketmq.flink.source.reader.RocketMQSourceReader;
-import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQRecordDeserializationSchema;
+import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema;
 import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
 import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitSerializer;
 
@@ -52,10 +52,11 @@
 public class RocketMQSource<OUT>
         implements Source<OUT, RocketMQPartitionSplit, RocketMQSourceEnumState>,
                 ResultTypeQueryable<OUT> {
-    private static final long serialVersionUID = -6755372893283732098L;
+    private static final long serialVersionUID = -1L;
 
     private final String topic;
     private final String consumerGroup;
+    private final String nameServerAddress;
     private final String tag;
     private final long stopInMs;
     private final long startTime;
@@ -64,20 +65,22 @@
 
     // Boundedness
     private final Boundedness boundedness;
-    private final RocketMQRecordDeserializationSchema<OUT> deserializationSchema;
+    private final RocketMQDeserializationSchema<OUT> deserializationSchema;
 
     public RocketMQSource(
             String topic,
             String consumerGroup,
+            String nameServerAddress,
             String tag,
             long stopInMs,
             long startTime,
             long startOffset,
             long partitionDiscoveryIntervalMs,
             Boundedness boundedness,
-            RocketMQRecordDeserializationSchema<OUT> deserializationSchema) {
+            RocketMQDeserializationSchema<OUT> deserializationSchema) {
         this.topic = topic;
         this.consumerGroup = consumerGroup;
+        this.nameServerAddress = nameServerAddress;
         this.tag = tag;
         this.stopInMs = stopInMs;
         this.startTime = startTime;
@@ -93,8 +96,8 @@
     }
 
     @Override
-    public SourceReader<OUT, RocketMQPartitionSplit> createReader(SourceReaderContext readerContext)
-            throws Exception {
+    public SourceReader<OUT, RocketMQPartitionSplit> createReader(
+            SourceReaderContext readerContext) {
         FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<OUT, Long, Long>>> elementsQueue =
                 new FutureCompletingBlockingQueue<>();
         deserializationSchema.open(
@@ -115,6 +118,7 @@
                         new RocketMQPartitionSplitReader<>(
                                 topic,
                                 consumerGroup,
+                                nameServerAddress,
                                 tag,
                                 stopInMs,
                                 startTime,
@@ -136,6 +140,7 @@
         return new RocketMQSourceEnumerator(
                 topic,
                 consumerGroup,
+                nameServerAddress,
                 stopInMs,
                 startOffset,
                 partitionDiscoveryIntervalMs,
@@ -150,6 +155,7 @@
         return new RocketMQSourceEnumerator(
                 topic,
                 consumerGroup,
+                nameServerAddress,
                 stopInMs,
                 startOffset,
                 partitionDiscoveryIntervalMs,
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java
new file mode 100644
index 0000000..064e193
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.common;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Includes config options of RocketMQ connector type. */
+public class RocketMQOptions {
+
+    public static final ConfigOption<String> TOPIC = ConfigOptions.key("topic").noDefaultValue();
+
+    public static final ConfigOption<String> CONSUMER_GROUP =
+            ConfigOptions.key("consumerGroup").noDefaultValue();
+
+    public static final ConfigOption<String> NAME_SERVER_ADDRESS =
+            ConfigOptions.key("nameServerAddress").noDefaultValue();
+
+    public static final ConfigOption<String> OPTIONAL_TAG =
+            ConfigOptions.key("tag").noDefaultValue();
+
+    public static final ConfigOption<Integer> OPTIONAL_START_MESSAGE_OFFSET =
+            ConfigOptions.key("startMessageOffset").defaultValue(-1);
+
+    public static final ConfigOption<Long> OPTIONAL_START_TIME_MILLS =
+            ConfigOptions.key("startTimeMs".toLowerCase()).longType().defaultValue(-1L);
+
+    public static final ConfigOption<String> OPTIONAL_START_TIME =
+            ConfigOptions.key("startTime".toLowerCase()).stringType().noDefaultValue();
+
+    public static final ConfigOption<String> OPTIONAL_END_TIME =
+            ConfigOptions.key("endTime").noDefaultValue();
+
+    public static final ConfigOption<String> OPTIONAL_TIME_ZONE =
+            ConfigOptions.key("timeZone".toLowerCase()).stringType().noDefaultValue();
+
+    public static final ConfigOption<Long> OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS =
+            ConfigOptions.key("partitionDiscoveryIntervalMs").longType().defaultValue(30000L);
+
+    public static final ConfigOption<String> OPTIONAL_ENCODING =
+            ConfigOptions.key("encoding").stringType().defaultValue("UTF-8");
+
+    public static final ConfigOption<String> OPTIONAL_FIELD_DELIMITER =
+            ConfigOptions.key("fieldDelimiter").stringType().defaultValue("\u0001");
+
+    public static final ConfigOption<String> OPTIONAL_LINE_DELIMITER =
+            ConfigOptions.key("lineDelimiter").stringType().defaultValue("\n");
+
+    public static final ConfigOption<Boolean> OPTIONAL_COLUMN_ERROR_DEBUG =
+            ConfigOptions.key("columnErrorDebug").booleanType().defaultValue(true);
+
+    public static final ConfigOption<String> OPTIONAL_LENGTH_CHECK =
+            ConfigOptions.key("lengthCheck").stringType().defaultValue("NONE");
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
index 08290c6..61b563a 100644
--- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
@@ -19,7 +19,6 @@
 package org.apache.rocketmq.flink.source.enumerator;
 
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.MQPullConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
@@ -38,6 +37,7 @@
 
 import javax.annotation.Nullable;
 
+import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -59,6 +59,8 @@
     private final String topic;
     /** The consumer group used for this RocketMQSource. */
     private final String consumerGroup;
+    /** The name server address used for this RocketMQSource. */
+    private final String nameServerAddress;
     /** The stop timestamp for this RocketMQSource. */
     private final long stopInMs;
     /** The start offset for this RocketMQSource. */
@@ -85,12 +87,13 @@
     private final Map<Integer, Set<RocketMQPartitionSplit>> pendingPartitionSplitAssignment;
 
     // Lazily instantiated or mutable fields.
-    private MQPullConsumer consumer;
+    private DefaultMQPullConsumer consumer;
     private boolean noMoreNewPartitionSplits = false;
 
     public RocketMQSourceEnumerator(
             String topic,
             String consumerGroup,
+            String nameServerAddress,
             long stopInMs,
             long startOffset,
             long partitionDiscoveryIntervalMs,
@@ -99,6 +102,7 @@
         this(
                 topic,
                 consumerGroup,
+                nameServerAddress,
                 stopInMs,
                 startOffset,
                 partitionDiscoveryIntervalMs,
@@ -110,6 +114,7 @@
     public RocketMQSourceEnumerator(
             String topic,
             String consumerGroup,
+            String nameServerAddress,
             long stopInMs,
             long startOffset,
             long partitionDiscoveryIntervalMs,
@@ -118,6 +123,7 @@
             Map<Integer, List<RocketMQPartitionSplit>> currentSplitsAssignments) {
         this.topic = topic;
         this.consumerGroup = consumerGroup;
+        this.nameServerAddress = nameServerAddress;
         this.stopInMs = stopInMs;
         this.startOffset = startOffset;
         this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
@@ -180,7 +186,7 @@
     }
 
     @Override
-    public RocketMQSourceEnumState snapshotState() {
+    public RocketMQSourceEnumState snapshotState(long checkpointId) {
         return new RocketMQSourceEnumState(readerIdToSplitAssignments);
     }
 
@@ -298,6 +304,14 @@
     private void initialRocketMQConsumer() {
         try {
             consumer = new DefaultMQPullConsumer(consumerGroup);
+            consumer.setNamesrvAddr(nameServerAddress);
+            consumer.setInstanceName(
+                    String.join(
+                            "||",
+                            ManagementFactory.getRuntimeMXBean().getName(),
+                            topic,
+                            consumerGroup,
+                            "" + System.nanoTime()));
             consumer.start();
         } catch (MQClientException e) {
             LOG.error("Failed to initial RocketMQ consumer.", e);
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
index 3bbeec8..41fbbea 100644
--- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -19,13 +19,12 @@
 package org.apache.rocketmq.flink.source.reader;
 
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.MQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQRecordDeserializationSchema;
+import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema;
 import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
@@ -43,6 +42,7 @@
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -70,12 +70,12 @@
     private final long startTime;
     private final long startOffset;
 
-    private final RocketMQRecordDeserializationSchema<T> deserializationSchema;
+    private final RocketMQDeserializationSchema<T> deserializationSchema;
     private final Map<Tuple3<String, String, Integer>, Long> startingOffsets;
     private final Map<Tuple3<String, String, Integer>, Long> stoppingTimestamps;
     private final SimpleCollector<T> collector;
 
-    private MQPullConsumer consumer;
+    private DefaultMQPullConsumer consumer;
 
     private volatile boolean wakeup = false;
 
@@ -84,11 +84,12 @@
     public RocketMQPartitionSplitReader(
             String topic,
             String consumerGroup,
+            String nameServerAddress,
             String tag,
             long stopInMs,
             long startTime,
             long startOffset,
-            RocketMQRecordDeserializationSchema<T> deserializationSchema) {
+            RocketMQDeserializationSchema<T> deserializationSchema) {
         this.topic = topic;
         this.tag = tag;
         this.stopInMs = stopInMs;
@@ -98,7 +99,7 @@
         this.startingOffsets = new HashMap<>();
         this.stoppingTimestamps = new HashMap<>();
         this.collector = new SimpleCollector<>();
-        initialRocketMQConsumer(consumerGroup);
+        initialRocketMQConsumer(consumerGroup, nameServerAddress);
     }
 
     @Override
@@ -280,9 +281,17 @@
 
     // --------------- private helper method ----------------------
 
-    private void initialRocketMQConsumer(String consumerGroup) {
+    private void initialRocketMQConsumer(String consumerGroup, String nameServerAddress) {
         try {
             consumer = new DefaultMQPullConsumer(consumerGroup);
+            consumer.setNamesrvAddr(nameServerAddress);
+            consumer.setInstanceName(
+                    String.join(
+                            "||",
+                            ManagementFactory.getRuntimeMXBean().getName(),
+                            topic,
+                            consumerGroup,
+                            "" + System.nanoTime()));
             consumer.start();
         } catch (MQClientException e) {
             LOG.error("Failed to initial RocketMQ consumer.", e);
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/BytesMessage.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/BytesMessage.java
new file mode 100644
index 0000000..d109a7f
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/BytesMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader.deserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Message contains byte array. */
+public class BytesMessage {
+
+    private byte[] data;
+    private Map<String, String> properties = new HashMap<>();
+
+    public byte[] getData() {
+        return data;
+    }
+
+    public void setData(byte[] data) {
+        this.data = data;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Map<String, String> props) {
+        this.properties = props;
+    }
+
+    public Object getProperty(String key) {
+        return properties.get(key);
+    }
+
+    public void setProperty(String key, String value) {
+        properties.put(key, value);
+    }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DeserializationSchema.java
similarity index 77%
copy from rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java
copy to rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DeserializationSchema.java
index 455f8af..3b087cc 100644
--- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DeserializationSchema.java
@@ -1,7 +1,5 @@
 package org.apache.rocketmq.flink.source.reader.deserializer;
 
-import org.apache.rocketmq.common.message.MessageExt;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -9,11 +7,9 @@
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.List;
 
-/** An interface for the deserialization of RocketMQ records. */
-public interface RocketMQRecordDeserializationSchema<T>
-        extends Serializable, ResultTypeQueryable<T> {
+/** An interface for the deserialization of records. */
+public interface DeserializationSchema<IN, OUT> extends Serializable, ResultTypeQueryable<OUT> {
 
     /**
      * Initialization method for the schema. It is called before the actual working methods {@link
@@ -35,9 +31,9 @@
      * records can be buffered in memory or collecting records might delay emitting checkpoint
      * barrier.
      *
-     * @param record The MessageExts to deserialize.
+     * @param record The record to deserialize.
      * @param out The collector to put the resulting messages.
      */
     @PublicEvolving
-    void deserialize(List<MessageExt> record, Collector<T> out) throws IOException;
+    void deserialize(IN record, Collector<OUT> out) throws IOException;
 }
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DirtyDataStrategy.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DirtyDataStrategy.java
new file mode 100644
index 0000000..06a0c2d
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/DirtyDataStrategy.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader.deserializer;
+
+/** Dirty data process strategy. */
+public enum DirtyDataStrategy {
+    SKIP,
+    SKIP_SILENT,
+    CUT,
+    PAD,
+    NULL,
+    EXCEPTION
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
similarity index 83%
rename from rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java
rename to rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
index 455f8af..6358e4c 100644
--- a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRecordDeserializationSchema.java
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQDeserializationSchema.java
@@ -4,16 +4,14 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.util.Collector;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.List;
 
 /** An interface for the deserialization of RocketMQ records. */
-public interface RocketMQRecordDeserializationSchema<T>
-        extends Serializable, ResultTypeQueryable<T> {
+public interface RocketMQDeserializationSchema<T>
+        extends DeserializationSchema<List<MessageExt>, T> {
 
     /**
      * Initialization method for the schema. It is called before the actual working methods {@link
@@ -25,7 +23,7 @@
      * @param context Contextual information that can be used during initialization.
      */
     @PublicEvolving
-    default void open(InitializationContext context) throws Exception {}
+    default void open(InitializationContext context) {}
 
     /**
      * Deserializes the byte message.
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchema.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchema.java
new file mode 100644
index 0000000..5bd990e
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchema.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader.deserializer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.flink.source.reader.deserializer.RowDeserializationSchema.MetadataConverter;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A row data wrapper class that wraps a {@link RocketMQDeserializationSchema} to deserialize {@link
+ * MessageExt}.
+ */
+public class RocketMQRowDeserializationSchema implements RocketMQDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowDeserializationSchema deserializationSchema;
+
+    private transient List<BytesMessage> bytesMessages = new ArrayList<>(1);
+
+    public RocketMQRowDeserializationSchema(
+            TableSchema tableSchema,
+            Map<String, String> properties,
+            boolean hasMetadata,
+            MetadataConverter[] metadataConverters) {
+        deserializationSchema =
+                new RowDeserializationSchema.Builder()
+                        .setProperties(properties)
+                        .setTableSchema(tableSchema)
+                        .setHasMetadata(hasMetadata)
+                        .setMetadataConverters(metadataConverters)
+                        .build();
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        deserializationSchema.open(context);
+        bytesMessages = new ArrayList<>();
+    }
+
+    @Override
+    public void deserialize(List<MessageExt> input, Collector<RowData> collector) {
+        extractMessages(input);
+        deserializationSchema.deserialize(bytesMessages, collector);
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    private void extractMessages(List<MessageExt> messages) {
+        bytesMessages = new ArrayList<>(messages.size());
+        for (MessageExt message : messages) {
+            BytesMessage bytesMessage = new BytesMessage();
+            bytesMessage.setData(message.getBody());
+            if (message.getProperties() != null) {
+                bytesMessage.setProperties(message.getProperties());
+            }
+            bytesMessage.setProperty("__topic__", message.getTopic());
+            bytesMessage.setProperty(
+                    "__store_timestamp__", String.valueOf(message.getStoreTimestamp()));
+            bytesMessage.setProperty(
+                    "__born_timestamp__", String.valueOf(message.getBornTimestamp()));
+            bytesMessage.setProperty("__queue_id__", String.valueOf(message.getQueueId()));
+            bytesMessage.setProperty("__queue_offset__", String.valueOf(message.getQueueOffset()));
+            bytesMessage.setProperty("__msg_id__", message.getMsgId());
+            bytesMessage.setProperty("__keys__", message.getKeys());
+            bytesMessage.setProperty("__tags__", message.getTags());
+            bytesMessages.add(bytesMessage);
+        }
+    }
+
+    @VisibleForTesting
+    public List<BytesMessage> getBytesMessages() {
+        return bytesMessages;
+    }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
new file mode 100644
index 0000000..f106693
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
@@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader.deserializer;
+
+import org.apache.rocketmq.flink.source.util.ByteSerializer;
+import org.apache.rocketmq.flink.source.util.ByteSerializer.ValueType;
+import org.apache.rocketmq.flink.source.util.StringSerializer;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The row based implementation of {@link DeserializationSchema} for the deserialization of records.
+ */
+public class RowDeserializationSchema
+        implements DeserializationSchema<List<BytesMessage>, RowData> {
+
+    private static final long serialVersionUID = -1L;
+    private static final Logger logger = LoggerFactory.getLogger(RowDeserializationSchema.class);
+
+    private transient TableSchema tableSchema;
+    private final DirtyDataStrategy formatErrorStrategy;
+    private final DirtyDataStrategy fieldMissingStrategy;
+    private final DirtyDataStrategy fieldIncrementStrategy;
+    private final String encoding;
+    private final String fieldDelimiter;
+    private final String lineDelimiter;
+    private final boolean columnErrorDebug;
+    private final MetadataCollector metadataCollector;
+    private final int totalColumnSize;
+    private final int dataColumnSize;
+    private final ValueType[] fieldTypes;
+    private transient DataType[] fieldDataTypes;
+    private final Set<String> headerFields;
+    private final Map<String, String> properties;
+    private final Map<String, Integer> columnIndexMapping;
+    private final Map<Integer, Integer> dataIndexMapping;
+    private long lastLogExceptionTime;
+    private long lastLogHandleFieldTime;
+
+    private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000;
+
+    public RowDeserializationSchema(
+            TableSchema tableSchema,
+            DirtyDataStrategy formatErrorStrategy,
+            DirtyDataStrategy fieldMissingStrategy,
+            DirtyDataStrategy fieldIncrementStrategy,
+            String encoding,
+            String fieldDelimiter,
+            String lineDelimiter,
+            boolean columnErrorDebug,
+            boolean hasMetadata,
+            MetadataConverter[] metadataConverters,
+            List<String> headerFields,
+            Map<String, String> properties) {
+        this.tableSchema = tableSchema;
+        this.formatErrorStrategy = formatErrorStrategy;
+        this.fieldMissingStrategy = fieldMissingStrategy;
+        this.fieldIncrementStrategy = fieldIncrementStrategy;
+        this.columnErrorDebug = columnErrorDebug;
+        this.encoding = encoding;
+        this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
+        this.lineDelimiter = StringEscapeUtils.unescapeJava(lineDelimiter);
+        this.metadataCollector = new MetadataCollector(hasMetadata, metadataConverters);
+        this.headerFields = headerFields == null ? null : new HashSet<>(headerFields);
+        this.properties = properties;
+        this.totalColumnSize = tableSchema.getFieldNames().length;
+        int dataColumnSize = 0;
+        this.fieldTypes = new ValueType[totalColumnSize];
+        this.columnIndexMapping = new HashMap<>();
+        this.dataIndexMapping = new HashMap<>();
+        for (int index = 0; index < tableSchema.getFieldNames().length; index++) {
+            this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
+        }
+        for (int index = 0; index < totalColumnSize; index++) {
+            ValueType type =
+                    ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass());
+            this.fieldTypes[index] = type;
+            if (!isHeaderField(index)) {
+                dataIndexMapping.put(dataColumnSize, index);
+                dataColumnSize++;
+            }
+        }
+        this.dataColumnSize = dataColumnSize;
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        DescriptorProperties descriptorProperties = new DescriptorProperties();
+        descriptorProperties.putProperties(properties);
+        this.tableSchema = SchemaValidator.deriveTableSinkSchema(descriptorProperties);
+        this.fieldDataTypes = tableSchema.getFieldDataTypes();
+        this.lastLogExceptionTime = System.currentTimeMillis();
+        this.lastLogHandleFieldTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public void deserialize(List<BytesMessage> messages, Collector<RowData> collector) {
+        metadataCollector.collector = collector;
+        deserialize(messages, metadataCollector);
+    }
+
+    private void deserialize(List<BytesMessage> messages, MetadataCollector collector) {
+        if (null == messages || messages.size() == 0) {
+            return;
+        }
+        for (BytesMessage message : messages) {
+            collector.message = message;
+            if (isOnlyHaveVarbinaryDataField()) {
+                GenericRowData rowData = new GenericRowData(totalColumnSize);
+                int dataIndex = dataIndexMapping.get(0);
+                rowData.setField(dataIndex, message.getData());
+                for (int index = 0; index < totalColumnSize; index++) {
+                    if (index == dataIndex) {
+                        continue;
+                    }
+                    String headerValue = getHeaderValue(message, index);
+                    rowData.setField(
+                            index,
+                            StringSerializer.deserialize(
+                                    headerValue,
+                                    fieldTypes[index],
+                                    fieldDataTypes[index],
+                                    new HashSet<>()));
+                }
+                collector.collect(rowData);
+            } else if (isAllHeaderField()) {
+                GenericRowData rowData = new GenericRowData(totalColumnSize);
+                for (int index = 0; index < totalColumnSize; index++) {
+                    String headerValue = getHeaderValue(message, index);
+                    rowData.setField(
+                            index,
+                            StringSerializer.deserialize(
+                                    headerValue,
+                                    fieldTypes[index],
+                                    fieldDataTypes[index],
+                                    new HashSet<>()));
+                }
+                collector.collect(rowData);
+            } else {
+                if (message.getData() == null) {
+                    logger.info("Deserialize empty BytesMessage body, ignore the empty message.");
+                    return;
+                }
+                deserializeBytesMessage(message, collector);
+            }
+        }
+    }
+
+    private boolean isOnlyHaveVarbinaryDataField() {
+        if (dataColumnSize == 1 && dataIndexMapping.size() == 1) {
+            int index = dataIndexMapping.get(0);
+            return isByteArrayType(tableSchema.getFieldNames()[index]);
+        }
+        return false;
+    }
+
+    private boolean isAllHeaderField() {
+        return null != headerFields && headerFields.size() == tableSchema.getFieldNames().length;
+    }
+
+    private void deserializeBytesMessage(BytesMessage message, Collector<RowData> collector) {
+        String body;
+        try {
+            body = new String(message.getData(), encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        String[] lines = StringUtils.split(body, lineDelimiter);
+        for (String line : lines) {
+            String[] data = StringUtils.splitPreserveAllTokens(line, fieldDelimiter);
+            if (dataColumnSize == 1) {
+                data = new String[1];
+                data[0] = line;
+            }
+            if (data.length < dataColumnSize) {
+                data = handleFieldMissing(data);
+            } else if (data.length > dataColumnSize) {
+                data = handleFieldIncrement(data);
+            }
+            if (data == null) {
+                continue;
+            }
+            GenericRowData rowData = new GenericRowData(totalColumnSize);
+            boolean skip = false;
+            for (int index = 0; index < totalColumnSize; index++) {
+                try {
+                    String fieldValue = getValue(message, data, line, index);
+                    rowData.setField(
+                            index,
+                            StringSerializer.deserialize(
+                                    fieldValue,
+                                    fieldTypes[index],
+                                    fieldDataTypes[index],
+                                    new HashSet<>()));
+                } catch (Exception e) {
+                    skip = handleException(rowData, index, data, e);
+                }
+            }
+            if (skip) {
+                continue;
+            }
+            collector.collect(rowData);
+        }
+    }
+
+    private boolean isHeaderField(int index) {
+        return headerFields != null && headerFields.contains(tableSchema.getFieldNames()[index]);
+    }
+
+    private String getHeaderValue(BytesMessage message, int index) {
+        Object object = message.getProperty(tableSchema.getFieldNames()[index]);
+        return object == null ? "" : (String) object;
+    }
+
+    private String getValue(BytesMessage message, String[] data, String line, int index) {
+        String fieldValue = null;
+        if (isHeaderField(index)) {
+            fieldValue = getHeaderValue(message, index);
+        } else {
+            if (dataColumnSize == 1) {
+                fieldValue = line;
+            } else {
+                if (index < data.length) {
+                    fieldValue = data[index];
+                }
+            }
+        }
+
+        return fieldValue;
+    }
+
+    private boolean isByteArrayType(String fieldName) {
+        TypeInformation<?> typeInformation =
+                tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
+        if (typeInformation != null) {
+            ValueType valueType = ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
+            return valueType == ValueType.V_ByteArray;
+        }
+        return false;
+    }
+
+    private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
+        boolean skip = false;
+        switch (formatErrorStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Data format error, field type: "
+                                    + fieldTypes[index]
+                                    + "field data: "
+                                    + data[index]
+                                    + ", index: "
+                                    + index
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]",
+                            e);
+                    lastLogExceptionTime = now;
+                }
+                skip = true;
+                break;
+            case SKIP_SILENT:
+                skip = true;
+                break;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                row.setField(index, null);
+                break;
+            case EXCEPTION:
+                throw new RuntimeException(e);
+        }
+
+        return skip;
+    }
+
+    private String[] handleFieldMissing(String[] data) {
+        switch (fieldMissingStrategy) {
+            default:
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field missing error, table column number: "
+                                    + totalColumnSize
+                                    + ", data column number: "
+                                    + dataColumnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            case CUT:
+            case NULL:
+            case PAD:
+                {
+                    String[] res = new String[totalColumnSize];
+                    for (int i = 0; i < data.length; ++i) {
+                        Object dataIndex = dataIndexMapping.get(i);
+                        if (dataIndex != null) {
+                            res[(int) dataIndex] = data[i];
+                        }
+                    }
+                    return res;
+                }
+            case EXCEPTION:
+                throw new RuntimeException();
+        }
+    }
+
+    private String[] handleFieldIncrement(String[] data) {
+        switch (fieldIncrementStrategy) {
+            case SKIP:
+                long now = System.currentTimeMillis();
+                if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+                    logger.warn(
+                            "Field increment error, table column number: "
+                                    + totalColumnSize
+                                    + ", data column number: "
+                                    + dataColumnSize
+                                    + ", data field number: "
+                                    + data.length
+                                    + ", data: ["
+                                    + StringUtils.join(data, ",")
+                                    + "]");
+                    lastLogHandleFieldTime = now;
+                }
+                return null;
+            case SKIP_SILENT:
+                return null;
+            default:
+            case CUT:
+            case NULL:
+            case PAD:
+                {
+                    String[] res = new String[totalColumnSize];
+                    for (int i = 0; i < dataColumnSize; ++i) {
+                        Object dataIndex = dataIndexMapping.get(i);
+                        if (dataIndex != null) {
+                            res[(int) dataIndex] = data[i];
+                        }
+                    }
+                    return res;
+                }
+            case EXCEPTION:
+                throw new RuntimeException();
+        }
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType());
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    /** Source metadata converter interface. */
+    public interface MetadataConverter extends Serializable {
+        Object read(BytesMessage message);
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    /** Metadata of RowData collector. */
+    public static final class MetadataCollector implements Collector<RowData>, Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private final boolean hasMetadata;
+        private final MetadataConverter[] metadataConverters;
+
+        public transient BytesMessage message;
+        public transient Collector<RowData> collector;
+
+        public MetadataCollector(boolean hasMetadata, MetadataConverter[] metadataConverters) {
+            this.hasMetadata = hasMetadata;
+            this.metadataConverters = metadataConverters;
+        }
+
+        @Override
+        public void collect(RowData physicalRow) {
+            if (hasMetadata) {
+                final int physicalArity = physicalRow.getArity();
+                final int metadataArity = metadataConverters.length;
+                final GenericRowData producedRow =
+                        new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
+                final GenericRowData genericPhysicalRow = (GenericRowData) physicalRow;
+                for (int index = 0; index < physicalArity; index++) {
+                    producedRow.setField(index, genericPhysicalRow.getField(index));
+                }
+                for (int index = 0; index < metadataArity; index++) {
+                    producedRow.setField(
+                            index + physicalArity, metadataConverters[index].read(message));
+                }
+                collector.collect(producedRow);
+            } else {
+                collector.collect(physicalRow);
+            }
+        }
+
+        @Override
+        public void close() {
+            // nothing to do
+        }
+    }
+
+    /** Builder of {@link RowDeserializationSchema}. */
+    public static class Builder {
+
+        private TableSchema schema;
+        private DirtyDataStrategy formatErrorStrategy = DirtyDataStrategy.SKIP;
+        private DirtyDataStrategy fieldMissingStrategy = DirtyDataStrategy.SKIP;
+        private DirtyDataStrategy fieldIncrementStrategy = DirtyDataStrategy.CUT;
+        private String encoding = "UTF-8";
+        private String lineDelimiter = "\n";
+        private String fieldDelimiter = "\u0001";
+        private boolean columnErrorDebug = false;
+        private boolean hasMetadata;
+        private MetadataConverter[] metadataConverters;
+        private List<String> headerFields;
+        private Map<String, String> properties;
+
+        public Builder() {}
+
+        public Builder setTableSchema(TableSchema tableSchema) {
+            this.schema = tableSchema;
+            return this;
+        }
+
+        public Builder setFormatErrorStrategy(DirtyDataStrategy formatErrorStrategy) {
+            this.formatErrorStrategy = formatErrorStrategy;
+            return this;
+        }
+
+        public Builder setFieldMissingStrategy(DirtyDataStrategy fieldMissingStrategy) {
+            this.fieldMissingStrategy = fieldMissingStrategy;
+            return this;
+        }
+
+        public Builder setFieldIncrementStrategy(DirtyDataStrategy fieldIncrementStrategy) {
+            this.fieldIncrementStrategy = fieldIncrementStrategy;
+            return this;
+        }
+
+        public Builder setEncoding(String encoding) {
+            this.encoding = encoding;
+            return this;
+        }
+
+        public Builder setFieldDelimiter(String fieldDelimiter) {
+            this.fieldDelimiter = fieldDelimiter;
+            return this;
+        }
+
+        public Builder setLineDelimiter(String lineDelimiter) {
+            this.lineDelimiter = lineDelimiter;
+            return this;
+        }
+
+        public Builder setColumnErrorDebug(boolean columnErrorDebug) {
+            this.columnErrorDebug = columnErrorDebug;
+            return this;
+        }
+
+        public Builder setHasMetadata(boolean hasMetadata) {
+            this.hasMetadata = hasMetadata;
+            return this;
+        }
+
+        public Builder setMetadataConverters(MetadataConverter[] metadataConverters) {
+            this.metadataConverters = metadataConverters;
+            return this;
+        }
+
+        public Builder setHeaderFields(List<String> headerFields) {
+            this.headerFields = headerFields;
+            return this;
+        }
+
+        public Builder setProperties(Map<String, String> properties) {
+            this.properties = properties;
+            if (null == properties) {
+                return this;
+            }
+            Configuration configuration = new Configuration();
+            for (String key : properties.keySet()) {
+                configuration.setString(key, properties.get(key));
+            }
+            String lengthCheck = configuration.get(CollectorOption.LENGTH_CHECK);
+            switch (lengthCheck.toUpperCase()) {
+                case "SKIP":
+                    {
+                        this.setFormatErrorStrategy(DirtyDataStrategy.SKIP);
+                        this.setFieldMissingStrategy(DirtyDataStrategy.SKIP);
+                        this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP);
+                    }
+                    break;
+                case "PAD":
+                    {
+                        this.setFormatErrorStrategy(DirtyDataStrategy.SKIP);
+                        this.setFieldMissingStrategy(DirtyDataStrategy.PAD);
+                        this.setFieldIncrementStrategy(DirtyDataStrategy.CUT);
+                    }
+                    break;
+                case "EXCEPTION":
+                    {
+                        this.setFormatErrorStrategy(DirtyDataStrategy.EXCEPTION);
+                        this.setFieldMissingStrategy(DirtyDataStrategy.EXCEPTION);
+                        this.setFieldIncrementStrategy(DirtyDataStrategy.EXCEPTION);
+                    }
+                    break;
+                case "SKIP_SILENT":
+                    {
+                        this.setFormatErrorStrategy(DirtyDataStrategy.SKIP_SILENT);
+                        this.setFieldMissingStrategy(DirtyDataStrategy.SKIP_SILENT);
+                        this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP_SILENT);
+                    }
+                    break;
+                default:
+            }
+            this.setEncoding(configuration.getString(CollectorOption.ENCODING));
+            this.setFieldDelimiter(configuration.getString(CollectorOption.FIELD_DELIMITER));
+            this.setLineDelimiter(configuration.getString(CollectorOption.LINE_DELIMITER));
+            this.setColumnErrorDebug(configuration.getBoolean(CollectorOption.COLUMN_ERROR_DEBUG));
+            return this;
+        }
+
+        public RowDeserializationSchema build() {
+            return new RowDeserializationSchema(
+                    schema,
+                    formatErrorStrategy,
+                    fieldMissingStrategy,
+                    fieldIncrementStrategy,
+                    encoding,
+                    fieldDelimiter,
+                    lineDelimiter,
+                    columnErrorDebug,
+                    hasMetadata,
+                    metadataConverters,
+                    headerFields,
+                    properties);
+        }
+    }
+
+    /** Options for {@link RowDeserializationSchema}. */
+    public static class CollectorOption {
+        public static final ConfigOption<String> ENCODING =
+                ConfigOptions.key("encoding".toLowerCase()).defaultValue("UTF-8");
+        public static final ConfigOption<String> FIELD_DELIMITER =
+                ConfigOptions.key("fieldDelimiter".toLowerCase()).defaultValue("\u0001");
+        public static final ConfigOption<String> LINE_DELIMITER =
+                ConfigOptions.key("lineDelimiter".toLowerCase()).defaultValue("\n");
+        public static final ConfigOption<Boolean> COLUMN_ERROR_DEBUG =
+                ConfigOptions.key("columnErrorDebug".toLowerCase()).defaultValue(true);
+        public static final ConfigOption<String> LENGTH_CHECK =
+                ConfigOptions.key("lengthCheck".toLowerCase()).defaultValue("NONE");
+    }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
new file mode 100644
index 0000000..ec41fc6
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.CONSUMER_GROUP;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.NAME_SERVER_ADDRESS;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_COLUMN_ERROR_DEBUG;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_ENCODING;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_END_TIME;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_FIELD_DELIMITER;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_LENGTH_CHECK;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_LINE_DELIMITER;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_TIME;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_TIME_MILLS;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TAG;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TIME_ZONE;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.TOPIC;
+
+/**
+ * Defines the {@link DynamicTableSourceFactory} implementation to create {@link
+ * RocketMQScanTableSource}.
+ */
+public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFactory {
+
+    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+    @Override
+    public String factoryIdentifier() {
+        return "rocketmq";
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+        requiredOptions.add(TOPIC);
+        requiredOptions.add(CONSUMER_GROUP);
+        requiredOptions.add(NAME_SERVER_ADDRESS);
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
+        optionalOptions.add(OPTIONAL_TAG);
+        optionalOptions.add(OPTIONAL_START_MESSAGE_OFFSET);
+        optionalOptions.add(OPTIONAL_START_TIME_MILLS);
+        optionalOptions.add(OPTIONAL_START_TIME);
+        optionalOptions.add(OPTIONAL_END_TIME);
+        optionalOptions.add(OPTIONAL_TIME_ZONE);
+        optionalOptions.add(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS);
+        optionalOptions.add(OPTIONAL_ENCODING);
+        optionalOptions.add(OPTIONAL_FIELD_DELIMITER);
+        optionalOptions.add(OPTIONAL_LINE_DELIMITER);
+        optionalOptions.add(OPTIONAL_COLUMN_ERROR_DEBUG);
+        optionalOptions.add(OPTIONAL_LENGTH_CHECK);
+        return optionalOptions;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        transformContext(this, context);
+        FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
+        helper.validate();
+        Map<String, String> rawProperties = context.getCatalogTable().getOptions();
+        Configuration configuration = Configuration.fromMap(rawProperties);
+        String topic = configuration.getString(TOPIC);
+        String consumerGroup = configuration.getString(CONSUMER_GROUP);
+        String nameServerAddress = configuration.getString(NAME_SERVER_ADDRESS);
+        String tag = configuration.getString(OPTIONAL_TAG);
+        int startMessageOffset = configuration.getInteger(OPTIONAL_START_MESSAGE_OFFSET);
+        long startTimeMs = configuration.getLong(OPTIONAL_START_TIME_MILLS);
+        String startDateTime = configuration.getString(OPTIONAL_START_TIME);
+        String timeZone = configuration.getString(OPTIONAL_TIME_ZONE);
+        long startTime = startTimeMs;
+        if (startTime == -1) {
+            if (!StringUtils.isNullOrWhitespaceOnly(startDateTime)) {
+                try {
+                    startTime = parseDateString(startDateTime, timeZone);
+                } catch (ParseException e) {
+                    throw new RuntimeException(
+                            String.format(
+                                    "Incorrect datetime format: %s, pls use ISO-8601 "
+                                            + "complete date plus hours, minutes and seconds format:%s.",
+                                    startDateTime, DATE_FORMAT),
+                            e);
+                }
+            }
+        }
+        long stopInMs = Long.MAX_VALUE;
+        String endDateTime = configuration.getString(OPTIONAL_END_TIME);
+        if (!StringUtils.isNullOrWhitespaceOnly(endDateTime)) {
+            try {
+                stopInMs = parseDateString(endDateTime, timeZone);
+            } catch (ParseException e) {
+                throw new RuntimeException(
+                        String.format(
+                                "Incorrect datetime format: %s, pls use ISO-8601 "
+                                        + "complete date plus hours, minutes and seconds format:%s.",
+                                endDateTime, DATE_FORMAT),
+                        e);
+            }
+            Preconditions.checkArgument(
+                    stopInMs >= startTime, "Start time should be less than stop time.");
+        }
+        long partitionDiscoveryIntervalMs =
+                configuration.getLong(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS);
+        DescriptorProperties descriptorProperties = new DescriptorProperties();
+        descriptorProperties.putProperties(rawProperties);
+        TableSchema physicalSchema =
+                TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+        descriptorProperties.putTableSchema("schema", physicalSchema);
+        return new RocketMQScanTableSource(
+                descriptorProperties,
+                physicalSchema,
+                topic,
+                consumerGroup,
+                nameServerAddress,
+                tag,
+                stopInMs,
+                startMessageOffset,
+                startMessageOffset < 0 ? startTime : -1L,
+                partitionDiscoveryIntervalMs);
+    }
+
+    private void transformContext(
+            DynamicTableFactory factory, DynamicTableFactory.Context context) {
+        Map<String, String> catalogOptions = context.getCatalogTable().getOptions();
+        Map<String, String> convertedOptions =
+                normalizeOptionCaseAsFactory(factory, catalogOptions);
+        catalogOptions.clear();
+        for (Map.Entry<String, String> entry : convertedOptions.entrySet()) {
+            catalogOptions.put(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private Map<String, String> normalizeOptionCaseAsFactory(
+            Factory factory, Map<String, String> options) {
+        Map<String, String> normalizedOptions = new HashMap<>();
+        Map<String, String> requiredOptionKeysLowerCaseToOriginal =
+                factory.requiredOptions().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        option -> option.key().toLowerCase(), ConfigOption::key));
+        Map<String, String> optionalOptionKeysLowerCaseToOriginal =
+                factory.optionalOptions().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        option -> option.key().toLowerCase(), ConfigOption::key));
+        for (Map.Entry<String, String> entry : options.entrySet()) {
+            final String catalogOptionKey = entry.getKey();
+            final String catalogOptionValue = entry.getValue();
+            normalizedOptions.put(
+                    requiredOptionKeysLowerCaseToOriginal.containsKey(
+                                    catalogOptionKey.toLowerCase())
+                            ? requiredOptionKeysLowerCaseToOriginal.get(
+                                    catalogOptionKey.toLowerCase())
+                            : optionalOptionKeysLowerCaseToOriginal.getOrDefault(
+                                    catalogOptionKey.toLowerCase(), catalogOptionKey),
+                    catalogOptionValue);
+        }
+        return normalizedOptions;
+    }
+
+    private Long parseDateString(String dateString, String timeZone) throws ParseException {
+        FastDateFormat simpleDateFormat =
+                FastDateFormat.getInstance(DATE_FORMAT, TimeZone.getTimeZone(timeZone));
+        return simpleDateFormat.parse(dateString).getTime();
+    }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
new file mode 100644
index 0000000..37ab6a5
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.table;
+
+import org.apache.rocketmq.flink.source.RocketMQSource;
+import org.apache.rocketmq.flink.source.reader.deserializer.BytesMessage;
+import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema;
+import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQRowDeserializationSchema;
+import org.apache.rocketmq.flink.source.reader.deserializer.RowDeserializationSchema.MetadataConverter;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.flink.api.connector.source.Boundedness.BOUNDED;
+import static org.apache.flink.api.connector.source.Boundedness.CONTINUOUS_UNBOUNDED;
+
+/** Defines the scan table source of RocketMQ. */
+public class RocketMQScanTableSource implements ScanTableSource, SupportsReadingMetadata {
+
+    private final DescriptorProperties properties;
+    private final TableSchema schema;
+
+    private final String topic;
+    private final String consumerGroup;
+    private final String nameServerAddress;
+    private final String tag;
+
+    private final long stopInMs;
+    private final long partitionDiscoveryIntervalMs;
+    private final long startMessageOffset;
+    private final long startTime;
+
+    private List<String> metadataKeys;
+
+    public RocketMQScanTableSource(
+            DescriptorProperties properties,
+            TableSchema schema,
+            String topic,
+            String consumerGroup,
+            String nameServerAddress,
+            String tag,
+            long stopInMs,
+            long startMessageOffset,
+            long startTime,
+            long partitionDiscoveryIntervalMs) {
+        this.properties = properties;
+        this.schema = schema;
+        this.topic = topic;
+        this.consumerGroup = consumerGroup;
+        this.nameServerAddress = nameServerAddress;
+        this.tag = tag;
+        this.stopInMs = stopInMs;
+        this.startMessageOffset = startMessageOffset;
+        this.startTime = startTime;
+        this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
+        this.metadataKeys = Collections.emptyList();
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+        return SourceProvider.of(
+                new RocketMQSource<>(
+                        topic,
+                        consumerGroup,
+                        nameServerAddress,
+                        tag,
+                        stopInMs,
+                        startTime,
+                        startMessageOffset < 0 ? 0 : startMessageOffset,
+                        partitionDiscoveryIntervalMs,
+                        isBounded() ? BOUNDED : CONTINUOUS_UNBOUNDED,
+                        createDeserializationSchema()));
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+        this.metadataKeys = metadataKeys;
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        RocketMQScanTableSource tableSource =
+                new RocketMQScanTableSource(
+                        properties,
+                        schema,
+                        topic,
+                        consumerGroup,
+                        nameServerAddress,
+                        tag,
+                        stopInMs,
+                        startMessageOffset,
+                        startTime,
+                        partitionDiscoveryIntervalMs);
+        tableSource.metadataKeys = metadataKeys;
+        return tableSource;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "RocketMQScanTableSource";
+    }
+
+    private RocketMQDeserializationSchema<RowData> createDeserializationSchema() {
+        final MetadataConverter[] metadataConverters =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                .orElseThrow(IllegalStateException::new))
+                        .map(m -> m.converter)
+                        .toArray(MetadataConverter[]::new);
+        return new RocketMQRowDeserializationSchema(
+                schema, properties.asMap(), metadataKeys.size() > 0, metadataConverters);
+    }
+
+    private boolean isBounded() {
+        return stopInMs != Long.MAX_VALUE;
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Metadata handling
+    // --------------------------------------------------------------------------------------------
+
+    enum ReadableMetadata {
+        TOPIC(
+                "topic",
+                DataTypes.STRING().notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(BytesMessage message) {
+                        return StringData.fromString(
+                                String.valueOf(message.getProperty("__topic__")));
+                    }
+                });
+
+        final String key;
+
+        final DataType dataType;
+
+        final MetadataConverter converter;
+
+        ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+    }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/ByteSerializer.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/ByteSerializer.java
new file mode 100644
index 0000000..358cb84
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/ByteSerializer.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.util;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.Period;
+
+/** BytesSerializer is responsible to deserialize field from byte array. */
+public class ByteSerializer {
+
+    public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+
+    public static Object deserialize(byte[] value, ValueType type) {
+        return deserialize(value, type, DEFAULT_CHARSET);
+    }
+
+    public static Object deserialize(byte[] value, ValueType type, Charset charset) {
+        switch (type) {
+            case V_String:
+                return null == value ? "" : new String(value, charset);
+            case V_Timestamp: // sql.Timestamp encoded as long
+                return new Timestamp(ByteUtils.toLong(value));
+            case V_Date: // sql.Date encoded as long
+                return new Date(ByteUtils.toLong(value));
+            case V_Time: // sql.Time encoded as long
+                return new Time(ByteUtils.toLong(value));
+            case V_BigDecimal:
+                return ByteUtils.toBigDecimal(value);
+            default:
+                return commonDeserialize(value, type);
+        }
+    }
+
+    private static Object commonDeserialize(byte[] value, ValueType type) {
+        switch (type) {
+            case V_ByteArray: // byte[]
+                return value;
+            case V_Byte: // byte
+                return null == value ? (byte) '\0' : value[0];
+            case V_Short:
+                return ByteUtils.toShort(value);
+            case V_Integer:
+                return ByteUtils.toInt(value);
+            case V_Long:
+                return ByteUtils.toLong(value);
+            case V_Float:
+                return ByteUtils.toFloat(value);
+            case V_Double:
+                return ByteUtils.toDouble(value);
+            case V_Boolean:
+                return ByteUtils.toBoolean(value);
+            case V_BigInteger:
+                return new BigInteger(value);
+            default:
+                throw new IllegalArgumentException();
+        }
+    }
+
+    public static ValueType getTypeIndex(Class<?> clazz) {
+        if (byte[].class.equals(clazz)) {
+            return ValueType.V_ByteArray;
+        } else if (String.class.equals(clazz)) {
+            return ValueType.V_String;
+        } else if (Byte.class.equals(clazz)) {
+            return ValueType.V_Byte;
+        } else if (Short.class.equals(clazz)) {
+            return ValueType.V_Short;
+        } else if (Integer.class.equals(clazz)) {
+            return ValueType.V_Integer;
+        } else if (Long.class.equals(clazz)) {
+            return ValueType.V_Long;
+        } else if (Float.class.equals(clazz)) {
+            return ValueType.V_Float;
+        } else if (Double.class.equals(clazz)) {
+            return ValueType.V_Double;
+        } else if (Boolean.class.equals(clazz)) {
+            return ValueType.V_Boolean;
+        } else if (Timestamp.class.equals(clazz)) {
+            return ValueType.V_Timestamp;
+        } else if (Date.class.equals(clazz)) {
+            return ValueType.V_Date;
+        } else if (Time.class.equals(clazz)) {
+            return ValueType.V_Time;
+        } else if (BigDecimal.class.equals(clazz)) {
+            return ValueType.V_BigDecimal;
+        } else if (BigInteger.class.equals(clazz)) {
+            return ValueType.V_BigInteger;
+        } else if (LocalDateTime.class.equals(clazz)) {
+            return ValueType.V_LocalDateTime;
+        } else if (LocalDate.class.equals(clazz)) {
+            return ValueType.V_LocalDate;
+        } else if (Duration.class.equals(clazz)) {
+            return ValueType.V_Duration;
+        } else if (LocalTime.class.equals(clazz)) {
+            return ValueType.V_LocalTime;
+        } else if (Period.class.equals(clazz)) {
+            return ValueType.V_Period;
+        } else if (OffsetDateTime.class.equals(clazz)) {
+            return ValueType.V_OffsetDateTime;
+        } else {
+            return ValueType.Unsupported;
+        }
+    }
+
+    /** Value Type. */
+    public enum ValueType {
+        V_ByteArray,
+        V_String,
+        V_Byte,
+        V_Short,
+        V_Integer,
+        V_Long,
+        V_Float,
+        V_Double,
+        V_Boolean,
+        V_Timestamp,
+        V_Date,
+        V_Time,
+        V_BigDecimal,
+        V_BigInteger,
+        V_LocalDateTime,
+        V_LocalDate,
+        V_Duration,
+        V_LocalTime,
+        V_Period,
+        V_OffsetDateTime,
+        Unsupported
+    }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/ByteUtils.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/ByteUtils.java
new file mode 100644
index 0000000..6e223a3
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/ByteUtils.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.util;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/** Utility class to for operations related to bytes. */
+public class ByteUtils {
+
+    /**
+     * Converts a byte array to an int value.
+     *
+     * @param bytes byte array
+     * @return the int value
+     */
+    public static int toInt(byte[] bytes) {
+        return toInt(bytes, 0);
+    }
+
+    /**
+     * Converts a byte array to an int value.
+     *
+     * @param bytes byte array
+     * @param offset offset into array
+     * @return the int value
+     * @throws IllegalArgumentException if there's not enough room in the array at the offset
+     *     indicated.
+     */
+    public static int toInt(byte[] bytes, int offset) {
+        if (offset + Integer.BYTES > bytes.length) {
+            throw explainWrongLengthOrOffset(bytes, offset, Integer.BYTES, Integer.BYTES);
+        }
+        int n = 0;
+        for (int i = offset; i < (offset + Integer.BYTES); i++) {
+            n <<= 8;
+            n ^= bytes[i] & 0xFF;
+        }
+        return n;
+    }
+
+    /**
+     * Convert a byte array to a boolean.
+     *
+     * @param b array
+     * @return True or false.
+     */
+    public static boolean toBoolean(final byte[] b) {
+        return toBoolean(b, 0);
+    }
+
+    /**
+     * Convert a byte array to a boolean.
+     *
+     * @param b array
+     * @param offset offset into array
+     * @return True or false.
+     */
+    public static boolean toBoolean(final byte[] b, final int offset) {
+        if (offset + 1 > b.length) {
+            throw explainWrongLengthOrOffset(b, offset, 1, 1);
+        }
+        return b[offset] != (byte) 0;
+    }
+
+    /**
+     * Converts a byte array to a long value.
+     *
+     * @param bytes array
+     * @return the long value
+     */
+    public static long toLong(byte[] bytes) {
+        return toLong(bytes, 0);
+    }
+
+    /**
+     * Converts a byte array to a long value.
+     *
+     * @param bytes array of bytes
+     * @param offset offset into array
+     * @return the long value
+     * @throws IllegalArgumentException if there's not enough room in the array at the offset
+     *     indicated.
+     */
+    public static long toLong(byte[] bytes, int offset) {
+        if (offset + Long.BYTES > bytes.length) {
+            throw explainWrongLengthOrOffset(bytes, offset, Long.BYTES, Long.BYTES);
+        }
+        long l = 0;
+        for (int i = offset; i < offset + Long.BYTES; i++) {
+            l <<= 8;
+            l ^= bytes[i] & 0xFF;
+        }
+        return l;
+    }
+
+    /**
+     * Presumes float encoded as IEEE 754 floating-point "single format".
+     *
+     * @param bytes byte array
+     * @return Float made from passed byte array.
+     */
+    public static float toFloat(byte[] bytes) {
+        return toFloat(bytes, 0);
+    }
+
+    /**
+     * Presumes float encoded as IEEE 754 floating-point "single format".
+     *
+     * @param bytes array to convert
+     * @param offset offset into array
+     * @return Float made from passed byte array.
+     */
+    public static float toFloat(byte[] bytes, int offset) {
+        return Float.intBitsToFloat(toInt(bytes, offset));
+    }
+
+    /**
+     * Parse a byte array to double.
+     *
+     * @param bytes byte array
+     * @return Return double made from passed bytes.
+     */
+    public static double toDouble(final byte[] bytes) {
+        return toDouble(bytes, 0);
+    }
+
+    /**
+     * Parse a byte array to double.
+     *
+     * @param bytes byte array
+     * @param offset offset where double is
+     * @return Return double made from passed bytes.
+     */
+    public static double toDouble(final byte[] bytes, final int offset) {
+        return Double.longBitsToDouble(toLong(bytes, offset));
+    }
+
+    /**
+     * Converts a byte array to a short value.
+     *
+     * @param bytes byte array
+     * @return the short value
+     */
+    public static short toShort(byte[] bytes) {
+        return toShort(bytes, 0);
+    }
+
+    /**
+     * Converts a byte array to a short value.
+     *
+     * @param bytes byte array
+     * @param offset offset into array
+     * @return the short value
+     * @throws IllegalArgumentException if there's not enough room in the array at the offset
+     *     indicated.
+     */
+    public static short toShort(byte[] bytes, int offset) {
+        if (offset + Short.BYTES > bytes.length) {
+            throw explainWrongLengthOrOffset(bytes, offset, Short.BYTES, Short.BYTES);
+        }
+        short n = 0;
+        n ^= bytes[offset] & 0xFF;
+        n <<= 8;
+        n ^= bytes[offset + 1] & 0xFF;
+        return n;
+    }
+
+    // ---------------------------------------------------------------------------------------------------------
+
+    private static IllegalArgumentException explainWrongLengthOrOffset(
+            final byte[] bytes, final int offset, final int length, final int expectedLength) {
+        String exceptionMessage;
+        if (length != expectedLength) {
+            exceptionMessage = "Wrong length: " + length + ", expected " + expectedLength;
+        } else {
+            exceptionMessage =
+                    "offset ("
+                            + offset
+                            + ") + length ("
+                            + length
+                            + ") exceed the"
+                            + " capacity of the array: "
+                            + bytes.length;
+        }
+        return new IllegalArgumentException(exceptionMessage);
+    }
+
+    public static BigDecimal toBigDecimal(byte[] bytes) {
+        return toBigDecimal(bytes, 0, bytes.length);
+    }
+
+    public static BigDecimal toBigDecimal(byte[] bytes, int offset, int length) {
+        if (bytes != null && length >= 5 && offset + length <= bytes.length) {
+            int scale = toInt(bytes, offset);
+            byte[] tcBytes = new byte[length - 4];
+            System.arraycopy(bytes, offset + 4, tcBytes, 0, length - 4);
+            return new BigDecimal(new BigInteger(tcBytes), scale);
+        } else {
+            return null;
+        }
+    }
+}
diff --git a/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java
new file mode 100644
index 0000000..b468ac9
--- /dev/null
+++ b/rocketmq-flink/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.util;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.TimestampConverter;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import sun.misc.BASE64Decoder;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Set;
+
+/** String serializer. */
+public class StringSerializer {
+
+    public static TimestampConverter timestampConverter = new TimestampConverter(3);
+    private static final BASE64Decoder decoder = new BASE64Decoder();
+
+    public static Object deserialize(
+            String value,
+            ByteSerializer.ValueType type,
+            DataType dataType,
+            Set<String> nullValues) {
+        return deserialize(value, type, dataType, nullValues, false);
+    }
+
+    public static Object deserialize(
+            String value,
+            ByteSerializer.ValueType type,
+            DataType dataType,
+            Set<String> nullValues,
+            Boolean isRGData) {
+        if (null != nullValues && nullValues.contains(value)) {
+            return null;
+        }
+        switch (type) {
+            case V_ByteArray: // byte[]
+                if (isRGData) {
+                    byte[] bytes = null;
+                    try {
+                        bytes = decoder.decodeBuffer(value);
+                    } catch (Exception e) {
+                        //
+                    }
+                    return bytes;
+                } else {
+                    return value.getBytes();
+                }
+            case V_String:
+                return BinaryStringData.fromString(value);
+            case V_Byte: // byte
+                return null == value ? null : Byte.parseByte(value);
+            case V_Short:
+                return null == value ? null : Short.parseShort(value);
+            case V_Integer:
+                return null == value ? null : Integer.parseInt(value);
+            case V_Long:
+                return null == value ? null : Long.parseLong(value);
+            case V_Float:
+                return null == value ? null : Float.parseFloat(value);
+            case V_Double:
+                return null == value ? null : Double.parseDouble(value);
+            case V_Boolean:
+                return null == value ? null : parseBoolean(value);
+            case V_Timestamp: // sql.Timestamp encoded as long
+                if (isRGData) {
+                    return null == value ? null : Long.parseLong(value);
+                }
+                if (null == value) {
+                    return null;
+                } else {
+                    try {
+                        return timestampConverter.toInternal(new Timestamp(Long.parseLong(value)));
+                    } catch (NumberFormatException e) {
+                        return timestampConverter.toInternal(Timestamp.valueOf(value));
+                    }
+                }
+            case V_Date: // sql.Date encoded as long
+                if (isRGData) {
+                    return null == value ? null : Long.parseLong(value);
+                }
+                return null == value
+                        ? null
+                        : DataFormatConverters.DateConverter.INSTANCE.toInternal(
+                                Date.valueOf(value));
+            case V_Time: // sql.Time encoded as long
+                if (isRGData) {
+                    return null == value ? null : Long.parseLong(value);
+                }
+                return null == value
+                        ? null
+                        : DataFormatConverters.TimeConverter.INSTANCE.toInternal(
+                                new Time(Long.parseLong(value)));
+            case V_BigDecimal:
+                DecimalType decimalType = (DecimalType) dataType.getLogicalType();
+                return value == null
+                        ? null
+                        : DecimalData.fromBigDecimal(
+                                new BigDecimal(value),
+                                decimalType.getPrecision(),
+                                decimalType.getScale());
+            case V_BigInteger:
+                return null == value ? null : new BigInteger(value);
+
+            default:
+                throw new IllegalArgumentException();
+        }
+    }
+
+    public static Object deserialize(
+            String value, ByteSerializer.ValueType type, DataType dataType, Boolean isRGData) {
+        return deserialize(value, type, dataType, null, isRGData);
+    }
+
+    public static Boolean parseBoolean(String s) {
+        if (s != null) {
+            if (s.equalsIgnoreCase("true") || s.equalsIgnoreCase("false")) {
+                return Boolean.valueOf(s);
+            }
+
+            if (s.equals("1")) {
+                return Boolean.TRUE;
+            }
+
+            if (s.equals("0")) {
+                return Boolean.FALSE;
+            }
+        }
+
+        throw new IllegalArgumentException();
+    }
+}
diff --git a/rocketmq-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/rocketmq-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..32de8b2
--- /dev/null
+++ b/rocketmq-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.rocketmq.flink.source.table.RocketMQDynamicTableSourceFactory
\ No newline at end of file
diff --git a/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchemaTest.java b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchemaTest.java
new file mode 100644
index 0000000..a904b04
--- /dev/null
+++ b/rocketmq-flink/src/test/java/org/apache/rocketmq/flink/source/reader/deserializer/RocketMQRowDeserializationSchemaTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.reader.deserializer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+import org.powermock.reflect.Whitebox;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+
+/** Test for {@link RocketMQRowDeserializationSchema}. */
+public class RocketMQRowDeserializationSchemaTest {
+
+    @Test
+    public void testDeserialize() {
+        TableSchema tableSchema =
+                new TableSchema.Builder()
+                        .field("int", DataTypes.INT())
+                        .field("varchar", DataTypes.VARCHAR(100))
+                        .field("bool", DataTypes.BOOLEAN())
+                        .field("char", DataTypes.CHAR(5))
+                        .field("tinyint", DataTypes.TINYINT())
+                        .field("decimal", DataTypes.DECIMAL(10, 5))
+                        .field("smallint", DataTypes.SMALLINT())
+                        .field("bigint", DataTypes.BIGINT())
+                        .field("float", DataTypes.FLOAT())
+                        .field("double", DataTypes.DOUBLE())
+                        .field("date", DataTypes.DATE())
+                        .field("time", DataTypes.TIME())
+                        .field("timestamp", DataTypes.TIMESTAMP())
+                        .build();
+        RocketMQRowDeserializationSchema recordDeserializer =
+                new RocketMQRowDeserializationSchema(tableSchema, new HashMap<>(), false, null);
+        RowDeserializationSchema sourceDeserializer = mock(RowDeserializationSchema.class);
+        InitializationContext initializationContext = mock(InitializationContext.class);
+        doNothing().when(sourceDeserializer).open(initializationContext);
+        Whitebox.setInternalState(recordDeserializer, "deserializationSchema", sourceDeserializer);
+        recordDeserializer.open(initializationContext);
+        MessageExt firstMsg =
+                new MessageExt(
+                        1,
+                        System.currentTimeMillis(),
+                        InetSocketAddress.createUnresolved("localhost", 8080),
+                        System.currentTimeMillis(),
+                        InetSocketAddress.createUnresolved("localhost", 8088),
+                        "184019387");
+        firstMsg.setBody("test_deserializer_raw_messages_1".getBytes());
+        MessageExt secondMsg =
+                new MessageExt(
+                        1,
+                        System.currentTimeMillis(),
+                        InetSocketAddress.createUnresolved("localhost", 8081),
+                        System.currentTimeMillis(),
+                        InetSocketAddress.createUnresolved("localhost", 8087),
+                        "284019387");
+        secondMsg.setBody("test_deserializer_raw_messages_2".getBytes());
+        MessageExt thirdMsg =
+                new MessageExt(
+                        1,
+                        System.currentTimeMillis(),
+                        InetSocketAddress.createUnresolved("localhost", 8082),
+                        System.currentTimeMillis(),
+                        InetSocketAddress.createUnresolved("localhost", 8086),
+                        "384019387");
+        thirdMsg.setBody("test_deserializer_raw_messages_3".getBytes());
+        List<MessageExt> messages = Arrays.asList(firstMsg, secondMsg, thirdMsg);
+        Collector<RowData> collector = mock(Collector.class);
+        recordDeserializer.deserialize(messages, collector);
+
+        assertEquals(3, recordDeserializer.getBytesMessages().size());
+        assertEquals(firstMsg.getBody(), recordDeserializer.getBytesMessages().get(0).getData());
+        assertEquals(
+                String.valueOf(firstMsg.getStoreTimestamp()),
+                recordDeserializer.getBytesMessages().get(0).getProperty("__store_timestamp__"));
+        assertEquals(
+                String.valueOf(firstMsg.getBornTimestamp()),
+                recordDeserializer.getBytesMessages().get(0).getProperty("__born_timestamp__"));
+        assertEquals(
+                String.valueOf(firstMsg.getQueueId()),
+                recordDeserializer.getBytesMessages().get(0).getProperty("__queue_id__"));
+        assertEquals(
+                String.valueOf(firstMsg.getQueueOffset()),
+                recordDeserializer.getBytesMessages().get(0).getProperty("__queue_offset__"));
+        assertEquals(secondMsg.getBody(), recordDeserializer.getBytesMessages().get(1).getData());
+        assertEquals(
+                String.valueOf(secondMsg.getStoreTimestamp()),
+                recordDeserializer.getBytesMessages().get(1).getProperty("__store_timestamp__"));
+        assertEquals(
+                String.valueOf(secondMsg.getBornTimestamp()),
+                recordDeserializer.getBytesMessages().get(1).getProperty("__born_timestamp__"));
+        assertEquals(
+                String.valueOf(secondMsg.getQueueId()),
+                recordDeserializer.getBytesMessages().get(1).getProperty("__queue_id__"));
+        assertEquals(
+                String.valueOf(secondMsg.getQueueOffset()),
+                recordDeserializer.getBytesMessages().get(1).getProperty("__queue_offset__"));
+        assertEquals(thirdMsg.getBody(), recordDeserializer.getBytesMessages().get(2).getData());
+        assertEquals(
+                String.valueOf(thirdMsg.getStoreTimestamp()),
+                recordDeserializer.getBytesMessages().get(2).getProperty("__store_timestamp__"));
+        assertEquals(
+                String.valueOf(thirdMsg.getBornTimestamp()),
+                recordDeserializer.getBytesMessages().get(2).getProperty("__born_timestamp__"));
+        assertEquals(
+                String.valueOf(thirdMsg.getQueueId()),
+                recordDeserializer.getBytesMessages().get(2).getProperty("__queue_id__"));
+        assertEquals(
+                String.valueOf(thirdMsg.getQueueOffset()),
+                recordDeserializer.getBytesMessages().get(2).getProperty("__queue_offset__"));
+    }
+}