[FLINK-33208] Support the writable metadata timestamp. This closes #24

Co-authored-by: Tan-JiaLiang <tanjialiang1997@gmail.com>
Co-authored-by: Ferenc Csaky <ferenc.csaky@pm.me>
diff --git a/docs/content.zh/docs/connectors/table/hbase.md b/docs/content.zh/docs/connectors/table/hbase.md
index 79aae7b..db68d37 100644
--- a/docs/content.zh/docs/connectors/table/hbase.md
+++ b/docs/content.zh/docs/connectors/table/hbase.md
@@ -75,6 +75,33 @@
 ON myTopic.key = hTable.rowkey;
 ```
 
+可用的元数据
+------------------
+
+以下的连接器元数据可以在表定义中通过元数据列的形式获取。
+
+`R/W` 列定义了一个元数据是可读的(`R`)还是可写的(`W`)。
+只读列必须声明为 `VIRTUAL` 以在 `INSERT INTO` 操作中排除它们。
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Key</th>
+      <th class="text-center" style="width: 30%">Data Type</th>
+      <th class="text-center" style="width: 40%">Description</th>
+      <th class="text-center" style="width: 5%">R/W</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>timestamp</code></td>
+      <td><code>TIMESTAMP_LTZ(3) NOT NULL</code></td>
+      <td>HBase记录的时间戳。</td>
+      <td><code>W</code></td>
+    </tr>
+    </tbody>
+</table>
+
 连接器参数
 ----------------
 
diff --git a/docs/content/docs/connectors/table/hbase.md b/docs/content/docs/connectors/table/hbase.md
index 31f6895..8c723da 100644
--- a/docs/content/docs/connectors/table/hbase.md
+++ b/docs/content/docs/connectors/table/hbase.md
@@ -77,6 +77,33 @@
 ON myTopic.key = hTable.rowkey;
 ```
 
+Available Metadata
+------------------
+
+The following connector metadata can be accessed as metadata columns in a table definition.
+
+The `R/W` column defines whether a metadata field is readable (`R`) and/or writable (`W`).
+Read-only columns must be declared `VIRTUAL` to exclude them during an `INSERT INTO` operation.
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Key</th>
+      <th class="text-center" style="width: 30%">Data Type</th>
+      <th class="text-center" style="width: 40%">Description</th>
+      <th class="text-center" style="width: 5%">R/W</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>timestamp</code></td>
+      <td><code>TIMESTAMP_LTZ(3) NOT NULL</code></td>
+      <td>Timestamp for the HBase mutation.</td>
+      <td><code>W</code></td>
+    </tr>
+    </tbody>
+</table>
+
 Connector Options
 ----------------
 
diff --git a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
index 5321bf2..19e4d1a 100644
--- a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
+++ b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
@@ -122,11 +122,13 @@
         Configuration hbaseConf = getHBaseConfiguration(tableOptions);
         HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
-        HBaseTableSchema hbaseSchema =
-                HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
 
         return new HBaseDynamicTableSink(
-                tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
+                tableName,
+                context.getPhysicalRowDataType(),
+                hbaseConf,
+                hBaseWriteOptions,
+                nullStringLiteral);
     }
 
     @Override
diff --git a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
index 0dec937..456948f 100644
--- a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
+++ b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
@@ -23,36 +23,49 @@
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.sink.HBaseSinkFunction;
 import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter;
+import org.apache.flink.connector.hbase.sink.WritableMetadata;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
 
 import org.apache.hadoop.conf.Configuration;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 /** HBase table sink implementation. */
 @Internal
-public class HBaseDynamicTableSink implements DynamicTableSink {
+public class HBaseDynamicTableSink implements DynamicTableSink, SupportsWritingMetadata {
 
     private final HBaseTableSchema hbaseTableSchema;
     private final String nullStringLiteral;
     private final Configuration hbaseConf;
     private final HBaseWriteOptions writeOptions;
     private final String tableName;
+    private final DataType physicalDataType;
+
+    /** Metadata that is appended at the end of a physical sink row. */
+    private List<String> metadataKeys;
 
     public HBaseDynamicTableSink(
             String tableName,
-            HBaseTableSchema hbaseTableSchema,
+            DataType physicalDataType,
             Configuration hbaseConf,
             HBaseWriteOptions writeOptions,
             String nullStringLiteral) {
-        this.hbaseTableSchema = hbaseTableSchema;
-        this.nullStringLiteral = nullStringLiteral;
+        this.tableName = tableName;
+        this.physicalDataType = physicalDataType;
+        this.hbaseTableSchema = HBaseTableSchema.fromDataType(physicalDataType);
+        this.metadataKeys = Collections.emptyList();
         this.hbaseConf = hbaseConf;
         this.writeOptions = writeOptions;
-        this.tableName = tableName;
+        this.nullStringLiteral = nullStringLiteral;
     }
 
     @Override
@@ -63,6 +76,8 @@
                         hbaseConf,
                         new RowDataToMutationConverter(
                                 hbaseTableSchema,
+                                physicalDataType,
+                                metadataKeys,
                                 nullStringLiteral,
                                 writeOptions.isIgnoreNullValue()),
                         writeOptions.getBufferFlushMaxSizeInBytes(),
@@ -84,9 +99,19 @@
     }
 
     @Override
+    public Map<String, DataType> listWritableMetadata() {
+        return WritableMetadata.list();
+    }
+
+    @Override
+    public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
+        this.metadataKeys = metadataKeys;
+    }
+
+    @Override
     public DynamicTableSink copy() {
         return new HBaseDynamicTableSink(
-                tableName, hbaseTableSchema, hbaseConf, writeOptions, nullStringLiteral);
+                tableName, physicalDataType, hbaseConf, writeOptions, nullStringLiteral);
     }
 
     @Override
diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
index 31290a7..1c53187 100644
--- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
+++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
@@ -373,6 +373,56 @@
     }
 
     @Test
+    public void testTableSinkWithTimestampMetadata() throws Exception {
+        StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+        tEnv.executeSql(
+                "CREATE TABLE hTableForSink ("
+                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+                        + " family1 ROW<col1 INT>,"
+                        + " version TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-1.4',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_5
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+
+        String insert =
+                "INSERT INTO hTableForSink VALUES"
+                        + "(1, ROW(1), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+                        + "(2, ROW(2), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+                        + "(3, ROW(3), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+                        + "(1, ROW(10), TO_TIMESTAMP_LTZ(1696767943269, 3)),"
+                        + "(2, ROW(20), TO_TIMESTAMP_LTZ(1696767943271, 3))";
+        tEnv.executeSql(insert).await();
+
+        tEnv.executeSql(
+                "CREATE TABLE hTableForQuery ("
+                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+                        + " family1 ROW<col1 INT>"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-1.4',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_5
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+        TableResult result = tEnv.executeSql("SELECT rowkey, family1.col1 FROM hTableForQuery");
+        List<Row> results = CollectionUtil.iteratorToList(result.collect());
+
+        String expected = "+I[1, 1]\n+I[2, 20]\n+I[3, 3]\n";
+
+        TestBaseUtils.compareResultAsText(results, expected);
+    }
+
+    @Test
     public void testTableSourceSinkWithDDL() throws Exception {
         StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
@@ -554,7 +604,12 @@
                 new HBaseSinkFunction<>(
                         TEST_NOT_EXISTS_TABLE,
                         hbaseConf,
-                        new RowDataToMutationConverter(tableSchema, "null", false),
+                        new RowDataToMutationConverter(
+                                tableSchema,
+                                tableSchema.convertToDataType(),
+                                Collections.emptyList(),
+                                "null",
+                                false),
                         2 * 1024 * 1024,
                         1000,
                         1000);
diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
index 530285d..c3512a9 100644
--- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
+++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
@@ -44,6 +44,7 @@
     protected static final String TEST_TABLE_2 = "testTable2";
     protected static final String TEST_TABLE_3 = "testTable3";
     protected static final String TEST_TABLE_4 = "testTable4";
+    protected static final String TEST_TABLE_5 = "testTable5";
     protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
     protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
 
@@ -96,6 +97,7 @@
         createHBaseTable2();
         createHBaseTable3();
         createHBaseTable4();
+        createHBaseTable5();
         createEmptyHBaseTable();
     }
 
@@ -244,6 +246,13 @@
         createTable(tableName, families, SPLIT_KEYS);
     }
 
+    private static void createHBaseTable5() {
+        // create a table
+        byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+        TableName tableName = TableName.valueOf(TEST_TABLE_5);
+        createTable(tableName, families, SPLIT_KEYS);
+    }
+
     private static void createEmptyHBaseTable() {
         // create a table
         byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
diff --git a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
index 8a10a1c..07c324d 100644
--- a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
+++ b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
@@ -124,11 +124,13 @@
         Configuration hbaseConf = getHBaseConfiguration(tableOptions);
         HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
-        HBaseTableSchema hbaseSchema =
-                HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
 
         return new HBaseDynamicTableSink(
-                tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
+                tableName,
+                context.getPhysicalRowDataType(),
+                hbaseConf,
+                hBaseWriteOptions,
+                nullStringLiteral);
     }
 
     @Override
diff --git a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
index 299a457..fa8ab78 100644
--- a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
+++ b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
@@ -23,34 +23,46 @@
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.sink.HBaseSinkFunction;
 import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter;
+import org.apache.flink.connector.hbase.sink.WritableMetadata;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
 
 import org.apache.hadoop.conf.Configuration;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 /** HBase table sink implementation. */
 @Internal
-public class HBaseDynamicTableSink implements DynamicTableSink {
+public class HBaseDynamicTableSink implements DynamicTableSink, SupportsWritingMetadata {
 
     private final String tableName;
     private final HBaseTableSchema hbaseTableSchema;
     private final Configuration hbaseConf;
     private final HBaseWriteOptions writeOptions;
     private final String nullStringLiteral;
+    private final DataType physicalDataType;
+
+    /** Metadata that is appended at the end of a physical sink row. */
+    private List<String> metadataKeys;
 
     public HBaseDynamicTableSink(
             String tableName,
-            HBaseTableSchema hbaseTableSchema,
+            DataType physicalDataType,
             Configuration hbaseConf,
             HBaseWriteOptions writeOptions,
             String nullStringLiteral) {
-
         this.tableName = tableName;
-        this.hbaseTableSchema = hbaseTableSchema;
+        this.physicalDataType = physicalDataType;
+        this.hbaseTableSchema = HBaseTableSchema.fromDataType(physicalDataType);
+        this.metadataKeys = Collections.emptyList();
         this.hbaseConf = hbaseConf;
         this.writeOptions = writeOptions;
         this.nullStringLiteral = nullStringLiteral;
@@ -64,6 +76,8 @@
                         hbaseConf,
                         new RowDataToMutationConverter(
                                 hbaseTableSchema,
+                                physicalDataType,
+                                metadataKeys,
                                 nullStringLiteral,
                                 writeOptions.isIgnoreNullValue()),
                         writeOptions.getBufferFlushMaxSizeInBytes(),
@@ -85,9 +99,19 @@
     }
 
     @Override
+    public Map<String, DataType> listWritableMetadata() {
+        return WritableMetadata.list();
+    }
+
+    @Override
+    public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
+        this.metadataKeys = metadataKeys;
+    }
+
+    @Override
     public DynamicTableSink copy() {
         return new HBaseDynamicTableSink(
-                tableName, hbaseTableSchema, hbaseConf, writeOptions, nullStringLiteral);
+                tableName, physicalDataType, hbaseConf, writeOptions, nullStringLiteral);
     }
 
     @Override
diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index df72146..c73bbc3 100644
--- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -402,6 +402,56 @@
     }
 
     @Test
+    public void testTableSinkWithTimestampMetadata() throws Exception {
+        StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+        tEnv.executeSql(
+                "CREATE TABLE hTableForSink ("
+                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+                        + " family1 ROW<col1 INT>,"
+                        + " version TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp'"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-2.2',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_5
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+
+        String insert =
+                "INSERT INTO hTableForSink VALUES"
+                        + "(1, ROW(1), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+                        + "(2, ROW(2), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+                        + "(3, ROW(3), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+                        + "(1, ROW(10), TO_TIMESTAMP_LTZ(1696767943269, 3)),"
+                        + "(2, ROW(20), TO_TIMESTAMP_LTZ(1696767943271, 3))";
+        tEnv.executeSql(insert).await();
+
+        tEnv.executeSql(
+                "CREATE TABLE hTableForQuery ("
+                        + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+                        + " family1 ROW<col1 INT>"
+                        + ") WITH ("
+                        + " 'connector' = 'hbase-2.2',"
+                        + " 'table-name' = '"
+                        + TEST_TABLE_5
+                        + "',"
+                        + " 'zookeeper.quorum' = '"
+                        + getZookeeperQuorum()
+                        + "'"
+                        + ")");
+        TableResult result = tEnv.executeSql("SELECT rowkey, family1.col1 FROM hTableForQuery");
+        List<Row> results = CollectionUtil.iteratorToList(result.collect());
+
+        String expected = "+I[1, 1]\n+I[2, 20]\n+I[3, 3]\n";
+
+        TestBaseUtils.compareResultAsText(results, expected);
+    }
+
+    @Test
     public void testTableSourceSinkWithDDL() throws Exception {
         StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
@@ -538,7 +588,12 @@
                 new HBaseSinkFunction<>(
                         TEST_NOT_EXISTS_TABLE,
                         hbaseConf,
-                        new RowDataToMutationConverter(tableSchema, "null", false),
+                        new RowDataToMutationConverter(
+                                tableSchema,
+                                tableSchema.convertToDataType(),
+                                Collections.emptyList(),
+                                "null",
+                                false),
                         2 * 1024 * 1024,
                         1000,
                         1000);
diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
index 1bf60d5..d6a0a9e 100644
--- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
+++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
@@ -44,6 +44,7 @@
     protected static final String TEST_TABLE_2 = "testTable2";
     protected static final String TEST_TABLE_3 = "testTable3";
     protected static final String TEST_TABLE_4 = "testTable4";
+    protected static final String TEST_TABLE_5 = "testTable5";
     protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
     protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
 
@@ -96,6 +97,7 @@
         createHBaseTable2();
         createHBaseTable3();
         createHBaseTable4();
+        createHBaseTable5();
         createEmptyHBaseTable();
     }
 
@@ -244,6 +246,13 @@
         createTable(tableName, families, SPLIT_KEYS);
     }
 
+    private static void createHBaseTable5() {
+        // create a table
+        byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+        TableName tableName = TableName.valueOf(TEST_TABLE_5);
+        createTable(tableName, families, SPLIT_KEYS);
+    }
+
     private static void createEmptyHBaseTable() {
         // create a table
         byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
index f07377c..f9a13c8 100644
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
@@ -18,13 +18,17 @@
 
 package org.apache.flink.connector.hbase.sink;
 
+import org.apache.flink.connector.hbase.sink.WritableMetadata.TimestampMetadata;
 import org.apache.flink.connector.hbase.util.HBaseSerde;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
 
 import org.apache.hadoop.hbase.client.Mutation;
 
+import java.util.List;
+
 /**
  * An implementation of {@link HBaseMutationConverter} which converts {@link RowData} into {@link
  * Mutation}.
@@ -35,13 +39,19 @@
     private final HBaseTableSchema schema;
     private final String nullStringLiteral;
     private final boolean ignoreNullValue;
+    private final TimestampMetadata timestampMetadata;
     private transient HBaseSerde serde;
 
     public RowDataToMutationConverter(
-            HBaseTableSchema schema, final String nullStringLiteral, boolean ignoreNullValue) {
+            HBaseTableSchema schema,
+            DataType physicalDataType,
+            List<String> metadataKeys,
+            String nullStringLiteral,
+            boolean ignoreNullValue) {
         this.schema = schema;
         this.nullStringLiteral = nullStringLiteral;
         this.ignoreNullValue = ignoreNullValue;
+        this.timestampMetadata = TimestampMetadata.of(metadataKeys, physicalDataType);
     }
 
     @Override
@@ -51,11 +61,12 @@
 
     @Override
     public Mutation convertToMutation(RowData record) {
+        Long timestamp = timestampMetadata.read(record);
         RowKind kind = record.getRowKind();
         if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
-            return serde.createPutMutation(record);
+            return serde.createPutMutation(record, timestamp);
         } else {
-            return serde.createDeleteMutation(record);
+            return serde.createDeleteMutation(record, timestamp);
         }
     }
 }
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java
new file mode 100644
index 0000000..c7e9e98
--- /dev/null
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hbase.HConstants;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Writable metadata for HBase. */
+public abstract class WritableMetadata<T> implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Returns the map of metadata keys and their corresponding data types that can be consumed by
+     * HBase sink for writing.
+     *
+     * <p>Note: All the supported writable metadata should be manually registered in it.
+     */
+    public static Map<String, DataType> list() {
+        Map<String, DataType> metadataMap = new HashMap<>();
+        metadataMap.put(TimestampMetadata.KEY, TimestampMetadata.DATA_TYPE);
+        return Collections.unmodifiableMap(metadataMap);
+    }
+
+    public abstract T read(RowData row);
+
+    /** Timestamp metadata for HBase. */
+    public static class TimestampMetadata extends WritableMetadata<Long> {
+
+        public static final String KEY = "timestamp";
+        public static final DataType DATA_TYPE =
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable();
+
+        private final int pos;
+
+        public TimestampMetadata(int pos) {
+            this.pos = pos;
+        }
+
+        @Override
+        public Long read(RowData row) {
+            if (pos < 0) {
+                return HConstants.LATEST_TIMESTAMP;
+            }
+            if (row.isNullAt(pos)) {
+                throw new IllegalArgumentException(
+                        String.format("Writable metadata '%s' can not accept null value", KEY));
+            }
+            return row.getTimestamp(pos, 3).getMillisecond();
+        }
+
+        public static TimestampMetadata of(List<String> metadataKeys, DataType physicalDataType) {
+            int pos = metadataKeys.indexOf(TimestampMetadata.KEY);
+            if (pos < 0) {
+                return new TimestampMetadata(-1);
+            }
+            return new TimestampMetadata(
+                    pos + physicalDataType.getLogicalType().getChildren().size());
+        }
+    }
+}
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index 458b25d..d381033 100644
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -135,7 +135,7 @@
      *
      * @return The appropriate instance of Put for this use case.
      */
-    public @Nullable Put createPutMutation(RowData row) {
+    public @Nullable Put createPutMutation(RowData row, long timestamp) {
         checkArgument(keyEncoder != null, "row key is not set.");
         byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
         if (rowkey.length == 0) {
@@ -143,7 +143,7 @@
             return null;
         }
         // upsert
-        Put put = new Put(rowkey);
+        Put put = new Put(rowkey, timestamp);
         for (int i = 0; i < fieldLength; i++) {
             if (i != rowkeyIndex) {
                 int f = i > rowkeyIndex ? i - 1 : i;
@@ -172,7 +172,7 @@
      *
      * @return The appropriate instance of Delete for this use case.
      */
-    public @Nullable Delete createDeleteMutation(RowData row) {
+    public @Nullable Delete createDeleteMutation(RowData row, long timestamp) {
         checkArgument(keyEncoder != null, "row key is not set.");
         byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
         if (rowkey.length == 0) {
@@ -180,7 +180,7 @@
             return null;
         }
         // delete
-        Delete delete = new Delete(rowkey);
+        Delete delete = new Delete(rowkey, timestamp);
         for (int i = 0; i < fieldLength; i++) {
             if (i != rowkeyIndex) {
                 int f = i > rowkeyIndex ? i - 1 : i;
diff --git a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
index e370809..85de7c8 100644
--- a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
+++ b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
@@ -25,6 +25,7 @@
 import org.apache.flink.table.types.DataType;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -105,7 +106,7 @@
     @Test
     public void writeIgnoreNullValueTest() {
         HBaseSerde serde = createHBaseSerde(false);
-        Put m1 = serde.createPutMutation(prepareRowData());
+        Put m1 = serde.createPutMutation(prepareRowData(), HConstants.LATEST_TIMESTAMP);
         assert m1 != null;
         assertThat(m1.getRow()).isNotEmpty();
         assertThat(m1.get(FAMILY1.getBytes(), F1COL1.getBytes())).isNotEmpty();
@@ -116,7 +117,9 @@
         assertThat(m1.get(FAMILY3.getBytes(), F3COL3.getBytes())).isNotEmpty();
 
         HBaseSerde writeIgnoreNullValueSerde = createHBaseSerde(true);
-        Put m2 = writeIgnoreNullValueSerde.createPutMutation(prepareRowData());
+        Put m2 =
+                writeIgnoreNullValueSerde.createPutMutation(
+                        prepareRowData(), HConstants.LATEST_TIMESTAMP);
         assert m2 != null;
         assertThat(m2.getRow()).isNotEmpty();
         assertThat(m2.get(FAMILY1.getBytes(), F1COL1.getBytes())).isEmpty();