[FLINK-33208] Support the writable metadata timestamp. This closes #24
Co-authored-by: Tan-JiaLiang <tanjialiang1997@gmail.com>
Co-authored-by: Ferenc Csaky <ferenc.csaky@pm.me>
diff --git a/docs/content.zh/docs/connectors/table/hbase.md b/docs/content.zh/docs/connectors/table/hbase.md
index 79aae7b..db68d37 100644
--- a/docs/content.zh/docs/connectors/table/hbase.md
+++ b/docs/content.zh/docs/connectors/table/hbase.md
@@ -75,6 +75,33 @@
ON myTopic.key = hTable.rowkey;
```
+可用的元数据
+------------------
+
+以下的连接器元数据可以在表定义中通过元数据列的形式获取。
+
+`R/W` 列定义了一个元数据是可读的(`R`)还是可写的(`W`)。
+只读列必须声明为 `VIRTUAL` 以在 `INSERT INTO` 操作中排除它们。
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Key</th>
+ <th class="text-center" style="width: 30%">Data Type</th>
+ <th class="text-center" style="width: 40%">Description</th>
+ <th class="text-center" style="width: 5%">R/W</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><code>timestamp</code></td>
+ <td><code>TIMESTAMP_LTZ(3) NOT NULL</code></td>
+ <td>HBase记录的时间戳。</td>
+ <td><code>W</code></td>
+ </tr>
+ </tbody>
+</table>
+
连接器参数
----------------
diff --git a/docs/content/docs/connectors/table/hbase.md b/docs/content/docs/connectors/table/hbase.md
index 31f6895..8c723da 100644
--- a/docs/content/docs/connectors/table/hbase.md
+++ b/docs/content/docs/connectors/table/hbase.md
@@ -77,6 +77,33 @@
ON myTopic.key = hTable.rowkey;
```
+Available Metadata
+------------------
+
+The following connector metadata can be accessed as metadata columns in a table definition.
+
+The `R/W` column defines whether a metadata field is readable (`R`) and/or writable (`W`).
+Read-only columns must be declared `VIRTUAL` to exclude them during an `INSERT INTO` operation.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Key</th>
+ <th class="text-center" style="width: 30%">Data Type</th>
+ <th class="text-center" style="width: 40%">Description</th>
+ <th class="text-center" style="width: 5%">R/W</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><code>timestamp</code></td>
+ <td><code>TIMESTAMP_LTZ(3) NOT NULL</code></td>
+ <td>Timestamp for the HBase mutation.</td>
+ <td><code>W</code></td>
+ </tr>
+ </tbody>
+</table>
+
Connector Options
----------------
diff --git a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
index 5321bf2..19e4d1a 100644
--- a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
+++ b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
@@ -122,11 +122,13 @@
Configuration hbaseConf = getHBaseConfiguration(tableOptions);
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
- HBaseTableSchema hbaseSchema =
- HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
return new HBaseDynamicTableSink(
- tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
+ tableName,
+ context.getPhysicalRowDataType(),
+ hbaseConf,
+ hBaseWriteOptions,
+ nullStringLiteral);
}
@Override
diff --git a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
index 0dec937..456948f 100644
--- a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
+++ b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java
@@ -23,36 +23,49 @@
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.sink.HBaseSinkFunction;
import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter;
+import org.apache.flink.connector.hbase.sink.WritableMetadata;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
/** HBase table sink implementation. */
@Internal
-public class HBaseDynamicTableSink implements DynamicTableSink {
+public class HBaseDynamicTableSink implements DynamicTableSink, SupportsWritingMetadata {
private final HBaseTableSchema hbaseTableSchema;
private final String nullStringLiteral;
private final Configuration hbaseConf;
private final HBaseWriteOptions writeOptions;
private final String tableName;
+ private final DataType physicalDataType;
+
+ /** Metadata that is appended at the end of a physical sink row. */
+ private List<String> metadataKeys;
public HBaseDynamicTableSink(
String tableName,
- HBaseTableSchema hbaseTableSchema,
+ DataType physicalDataType,
Configuration hbaseConf,
HBaseWriteOptions writeOptions,
String nullStringLiteral) {
- this.hbaseTableSchema = hbaseTableSchema;
- this.nullStringLiteral = nullStringLiteral;
+ this.tableName = tableName;
+ this.physicalDataType = physicalDataType;
+ this.hbaseTableSchema = HBaseTableSchema.fromDataType(physicalDataType);
+ this.metadataKeys = Collections.emptyList();
this.hbaseConf = hbaseConf;
this.writeOptions = writeOptions;
- this.tableName = tableName;
+ this.nullStringLiteral = nullStringLiteral;
}
@Override
@@ -63,6 +76,8 @@
hbaseConf,
new RowDataToMutationConverter(
hbaseTableSchema,
+ physicalDataType,
+ metadataKeys,
nullStringLiteral,
writeOptions.isIgnoreNullValue()),
writeOptions.getBufferFlushMaxSizeInBytes(),
@@ -84,9 +99,19 @@
}
@Override
+ public Map<String, DataType> listWritableMetadata() {
+ return WritableMetadata.list();
+ }
+
+ @Override
+ public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
+ this.metadataKeys = metadataKeys;
+ }
+
+ @Override
public DynamicTableSink copy() {
return new HBaseDynamicTableSink(
- tableName, hbaseTableSchema, hbaseConf, writeOptions, nullStringLiteral);
+ tableName, physicalDataType, hbaseConf, writeOptions, nullStringLiteral);
}
@Override
diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
index 31290a7..1c53187 100644
--- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
+++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java
@@ -373,6 +373,56 @@
}
@Test
+ public void testTableSinkWithTimestampMetadata() throws Exception {
+ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+ tEnv.executeSql(
+ "CREATE TABLE hTableForSink ("
+ + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ + " family1 ROW<col1 INT>,"
+ + " version TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'"
+ + ") WITH ("
+ + " 'connector' = 'hbase-1.4',"
+ + " 'table-name' = '"
+ + TEST_TABLE_5
+ + "',"
+ + " 'zookeeper.quorum' = '"
+ + getZookeeperQuorum()
+ + "'"
+ + ")");
+
+ String insert =
+ "INSERT INTO hTableForSink VALUES"
+ + "(1, ROW(1), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+ + "(2, ROW(2), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+ + "(3, ROW(3), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+ + "(1, ROW(10), TO_TIMESTAMP_LTZ(1696767943269, 3)),"
+ + "(2, ROW(20), TO_TIMESTAMP_LTZ(1696767943271, 3))";
+ tEnv.executeSql(insert).await();
+
+ tEnv.executeSql(
+ "CREATE TABLE hTableForQuery ("
+ + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ + " family1 ROW<col1 INT>"
+ + ") WITH ("
+ + " 'connector' = 'hbase-1.4',"
+ + " 'table-name' = '"
+ + TEST_TABLE_5
+ + "',"
+ + " 'zookeeper.quorum' = '"
+ + getZookeeperQuorum()
+ + "'"
+ + ")");
+ TableResult result = tEnv.executeSql("SELECT rowkey, family1.col1 FROM hTableForQuery");
+ List<Row> results = CollectionUtil.iteratorToList(result.collect());
+
+ String expected = "+I[1, 1]\n+I[2, 20]\n+I[3, 3]\n";
+
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ @Test
public void testTableSourceSinkWithDDL() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
@@ -554,7 +604,12 @@
new HBaseSinkFunction<>(
TEST_NOT_EXISTS_TABLE,
hbaseConf,
- new RowDataToMutationConverter(tableSchema, "null", false),
+ new RowDataToMutationConverter(
+ tableSchema,
+ tableSchema.convertToDataType(),
+ Collections.emptyList(),
+ "null",
+ false),
2 * 1024 * 1024,
1000,
1000);
diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
index 530285d..c3512a9 100644
--- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
+++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java
@@ -44,6 +44,7 @@
protected static final String TEST_TABLE_2 = "testTable2";
protected static final String TEST_TABLE_3 = "testTable3";
protected static final String TEST_TABLE_4 = "testTable4";
+ protected static final String TEST_TABLE_5 = "testTable5";
protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
@@ -96,6 +97,7 @@
createHBaseTable2();
createHBaseTable3();
createHBaseTable4();
+ createHBaseTable5();
createEmptyHBaseTable();
}
@@ -244,6 +246,13 @@
createTable(tableName, families, SPLIT_KEYS);
}
+ private static void createHBaseTable5() {
+ // create a table
+ byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+ TableName tableName = TableName.valueOf(TEST_TABLE_5);
+ createTable(tableName, families, SPLIT_KEYS);
+ }
+
private static void createEmptyHBaseTable() {
// create a table
byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
diff --git a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
index 8a10a1c..07c324d 100644
--- a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
+++ b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
@@ -124,11 +124,13 @@
Configuration hbaseConf = getHBaseConfiguration(tableOptions);
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
- HBaseTableSchema hbaseSchema =
- HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
return new HBaseDynamicTableSink(
- tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
+ tableName,
+ context.getPhysicalRowDataType(),
+ hbaseConf,
+ hBaseWriteOptions,
+ nullStringLiteral);
}
@Override
diff --git a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
index 299a457..fa8ab78 100644
--- a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
+++ b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java
@@ -23,34 +23,46 @@
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.sink.HBaseSinkFunction;
import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter;
+import org.apache.flink.connector.hbase.sink.WritableMetadata;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
/** HBase table sink implementation. */
@Internal
-public class HBaseDynamicTableSink implements DynamicTableSink {
+public class HBaseDynamicTableSink implements DynamicTableSink, SupportsWritingMetadata {
private final String tableName;
private final HBaseTableSchema hbaseTableSchema;
private final Configuration hbaseConf;
private final HBaseWriteOptions writeOptions;
private final String nullStringLiteral;
+ private final DataType physicalDataType;
+
+ /** Metadata that is appended at the end of a physical sink row. */
+ private List<String> metadataKeys;
public HBaseDynamicTableSink(
String tableName,
- HBaseTableSchema hbaseTableSchema,
+ DataType physicalDataType,
Configuration hbaseConf,
HBaseWriteOptions writeOptions,
String nullStringLiteral) {
-
this.tableName = tableName;
- this.hbaseTableSchema = hbaseTableSchema;
+ this.physicalDataType = physicalDataType;
+ this.hbaseTableSchema = HBaseTableSchema.fromDataType(physicalDataType);
+ this.metadataKeys = Collections.emptyList();
this.hbaseConf = hbaseConf;
this.writeOptions = writeOptions;
this.nullStringLiteral = nullStringLiteral;
@@ -64,6 +76,8 @@
hbaseConf,
new RowDataToMutationConverter(
hbaseTableSchema,
+ physicalDataType,
+ metadataKeys,
nullStringLiteral,
writeOptions.isIgnoreNullValue()),
writeOptions.getBufferFlushMaxSizeInBytes(),
@@ -85,9 +99,19 @@
}
@Override
+ public Map<String, DataType> listWritableMetadata() {
+ return WritableMetadata.list();
+ }
+
+ @Override
+ public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
+ this.metadataKeys = metadataKeys;
+ }
+
+ @Override
public DynamicTableSink copy() {
return new HBaseDynamicTableSink(
- tableName, hbaseTableSchema, hbaseConf, writeOptions, nullStringLiteral);
+ tableName, physicalDataType, hbaseConf, writeOptions, nullStringLiteral);
}
@Override
diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
index df72146..c73bbc3 100644
--- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
+++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
@@ -402,6 +402,56 @@
}
@Test
+ public void testTableSinkWithTimestampMetadata() throws Exception {
+ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
+
+ tEnv.executeSql(
+ "CREATE TABLE hTableForSink ("
+ + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ + " family1 ROW<col1 INT>,"
+ + " version TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp'"
+ + ") WITH ("
+ + " 'connector' = 'hbase-2.2',"
+ + " 'table-name' = '"
+ + TEST_TABLE_5
+ + "',"
+ + " 'zookeeper.quorum' = '"
+ + getZookeeperQuorum()
+ + "'"
+ + ")");
+
+ String insert =
+ "INSERT INTO hTableForSink VALUES"
+ + "(1, ROW(1), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+ + "(2, ROW(2), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+ + "(3, ROW(3), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+ + "(1, ROW(10), TO_TIMESTAMP_LTZ(1696767943269, 3)),"
+ + "(2, ROW(20), TO_TIMESTAMP_LTZ(1696767943271, 3))";
+ tEnv.executeSql(insert).await();
+
+ tEnv.executeSql(
+ "CREATE TABLE hTableForQuery ("
+ + " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ + " family1 ROW<col1 INT>"
+ + ") WITH ("
+ + " 'connector' = 'hbase-2.2',"
+ + " 'table-name' = '"
+ + TEST_TABLE_5
+ + "',"
+ + " 'zookeeper.quorum' = '"
+ + getZookeeperQuorum()
+ + "'"
+ + ")");
+ TableResult result = tEnv.executeSql("SELECT rowkey, family1.col1 FROM hTableForQuery");
+ List<Row> results = CollectionUtil.iteratorToList(result.collect());
+
+ String expected = "+I[1, 1]\n+I[2, 20]\n+I[3, 3]\n";
+
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ @Test
public void testTableSourceSinkWithDDL() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
@@ -538,7 +588,12 @@
new HBaseSinkFunction<>(
TEST_NOT_EXISTS_TABLE,
hbaseConf,
- new RowDataToMutationConverter(tableSchema, "null", false),
+ new RowDataToMutationConverter(
+ tableSchema,
+ tableSchema.convertToDataType(),
+ Collections.emptyList(),
+ "null",
+ false),
2 * 1024 * 1024,
1000,
1000);
diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
index 1bf60d5..d6a0a9e 100644
--- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
+++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java
@@ -44,6 +44,7 @@
protected static final String TEST_TABLE_2 = "testTable2";
protected static final String TEST_TABLE_3 = "testTable3";
protected static final String TEST_TABLE_4 = "testTable4";
+ protected static final String TEST_TABLE_5 = "testTable5";
protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";
@@ -96,6 +97,7 @@
createHBaseTable2();
createHBaseTable3();
createHBaseTable4();
+ createHBaseTable5();
createEmptyHBaseTable();
}
@@ -244,6 +246,13 @@
createTable(tableName, families, SPLIT_KEYS);
}
+ private static void createHBaseTable5() {
+ // create a table
+ byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
+ TableName tableName = TableName.valueOf(TEST_TABLE_5);
+ createTable(tableName, families, SPLIT_KEYS);
+ }
+
private static void createEmptyHBaseTable() {
// create a table
byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
index f07377c..f9a13c8 100644
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
@@ -18,13 +18,17 @@
package org.apache.flink.connector.hbase.sink;
+import org.apache.flink.connector.hbase.sink.WritableMetadata.TimestampMetadata;
import org.apache.flink.connector.hbase.util.HBaseSerde;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.hbase.client.Mutation;
+import java.util.List;
+
/**
* An implementation of {@link HBaseMutationConverter} which converts {@link RowData} into {@link
* Mutation}.
@@ -35,13 +39,19 @@
private final HBaseTableSchema schema;
private final String nullStringLiteral;
private final boolean ignoreNullValue;
+ private final TimestampMetadata timestampMetadata;
private transient HBaseSerde serde;
public RowDataToMutationConverter(
- HBaseTableSchema schema, final String nullStringLiteral, boolean ignoreNullValue) {
+ HBaseTableSchema schema,
+ DataType physicalDataType,
+ List<String> metadataKeys,
+ String nullStringLiteral,
+ boolean ignoreNullValue) {
this.schema = schema;
this.nullStringLiteral = nullStringLiteral;
this.ignoreNullValue = ignoreNullValue;
+ this.timestampMetadata = TimestampMetadata.of(metadataKeys, physicalDataType);
}
@Override
@@ -51,11 +61,12 @@
@Override
public Mutation convertToMutation(RowData record) {
+ Long timestamp = timestampMetadata.read(record);
RowKind kind = record.getRowKind();
if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
- return serde.createPutMutation(record);
+ return serde.createPutMutation(record, timestamp);
} else {
- return serde.createDeleteMutation(record);
+ return serde.createDeleteMutation(record, timestamp);
}
}
}
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java
new file mode 100644
index 0000000..c7e9e98
--- /dev/null
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.hbase.HConstants;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Writable metadata for HBase. */
+public abstract class WritableMetadata<T> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Returns the map of metadata keys and their corresponding data types that can be consumed by
+ * HBase sink for writing.
+ *
+ * <p>Note: All the supported writable metadata should be manually registered in it.
+ */
+ public static Map<String, DataType> list() {
+ Map<String, DataType> metadataMap = new HashMap<>();
+ metadataMap.put(TimestampMetadata.KEY, TimestampMetadata.DATA_TYPE);
+ return Collections.unmodifiableMap(metadataMap);
+ }
+
+ public abstract T read(RowData row);
+
+ /** Timestamp metadata for HBase. */
+ public static class TimestampMetadata extends WritableMetadata<Long> {
+
+ public static final String KEY = "timestamp";
+ public static final DataType DATA_TYPE =
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable();
+
+ private final int pos;
+
+ public TimestampMetadata(int pos) {
+ this.pos = pos;
+ }
+
+ @Override
+ public Long read(RowData row) {
+ if (pos < 0) {
+ return HConstants.LATEST_TIMESTAMP;
+ }
+ if (row.isNullAt(pos)) {
+ throw new IllegalArgumentException(
+ String.format("Writable metadata '%s' can not accept null value", KEY));
+ }
+ return row.getTimestamp(pos, 3).getMillisecond();
+ }
+
+ public static TimestampMetadata of(List<String> metadataKeys, DataType physicalDataType) {
+ int pos = metadataKeys.indexOf(TimestampMetadata.KEY);
+ if (pos < 0) {
+ return new TimestampMetadata(-1);
+ }
+ return new TimestampMetadata(
+ pos + physicalDataType.getLogicalType().getChildren().size());
+ }
+ }
+}
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index 458b25d..d381033 100644
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -135,7 +135,7 @@
*
* @return The appropriate instance of Put for this use case.
*/
- public @Nullable Put createPutMutation(RowData row) {
+ public @Nullable Put createPutMutation(RowData row, long timestamp) {
checkArgument(keyEncoder != null, "row key is not set.");
byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
if (rowkey.length == 0) {
@@ -143,7 +143,7 @@
return null;
}
// upsert
- Put put = new Put(rowkey);
+ Put put = new Put(rowkey, timestamp);
for (int i = 0; i < fieldLength; i++) {
if (i != rowkeyIndex) {
int f = i > rowkeyIndex ? i - 1 : i;
@@ -172,7 +172,7 @@
*
* @return The appropriate instance of Delete for this use case.
*/
- public @Nullable Delete createDeleteMutation(RowData row) {
+ public @Nullable Delete createDeleteMutation(RowData row, long timestamp) {
checkArgument(keyEncoder != null, "row key is not set.");
byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
if (rowkey.length == 0) {
@@ -180,7 +180,7 @@
return null;
}
// delete
- Delete delete = new Delete(rowkey);
+ Delete delete = new Delete(rowkey, timestamp);
for (int i = 0; i < fieldLength; i++) {
if (i != rowkeyIndex) {
int f = i > rowkeyIndex ? i - 1 : i;
diff --git a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
index e370809..85de7c8 100644
--- a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
+++ b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
@@ -25,6 +25,7 @@
import org.apache.flink.table.types.DataType;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -105,7 +106,7 @@
@Test
public void writeIgnoreNullValueTest() {
HBaseSerde serde = createHBaseSerde(false);
- Put m1 = serde.createPutMutation(prepareRowData());
+ Put m1 = serde.createPutMutation(prepareRowData(), HConstants.LATEST_TIMESTAMP);
assert m1 != null;
assertThat(m1.getRow()).isNotEmpty();
assertThat(m1.get(FAMILY1.getBytes(), F1COL1.getBytes())).isNotEmpty();
@@ -116,7 +117,9 @@
assertThat(m1.get(FAMILY3.getBytes(), F3COL3.getBytes())).isNotEmpty();
HBaseSerde writeIgnoreNullValueSerde = createHBaseSerde(true);
- Put m2 = writeIgnoreNullValueSerde.createPutMutation(prepareRowData());
+ Put m2 =
+ writeIgnoreNullValueSerde.createPutMutation(
+ prepareRowData(), HConstants.LATEST_TIMESTAMP);
assert m2 != null;
assertThat(m2.getRow()).isNotEmpty();
assertThat(m2.get(FAMILY1.getBytes(), F1COL1.getBytes())).isEmpty();