[FLINK-17528][table] Remove ArrayData#get() API and use ElementGetter instead
This closes #13653
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
index a446fd8..e7530f1 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
+++ b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
@@ -203,6 +203,7 @@
private RowDataToJsonConverter createArrayConverter(ArrayType type) {
final LogicalType elementType = type.getElementType();
final RowDataToJsonConverter elementConverter = createConverter(elementType);
+ final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
return (mapper, reuse, value) -> {
ArrayNode node;
@@ -217,7 +218,7 @@
ArrayData array = (ArrayData) value;
int numElements = array.size();
for (int i = 0; i < numElements; i++) {
- Object element = ArrayData.get(array, i, elementType);
+ Object element = elementGetter.getElementOrNull(array, i);
node.add(elementConverter.convert(mapper, null, element));
}
@@ -233,6 +234,7 @@
"The type is: " + typeSummary);
}
final RowDataToJsonConverter valueConverter = createConverter(valueType);
+ final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
return (mapper, reuse, object) -> {
ObjectNode node;
// reuse could be a NullNode if last record is null.
@@ -248,7 +250,7 @@
int numElements = map.size();
for (int i = 0; i < numElements; i++) {
String fieldName = keyArray.getString(i).toString(); // key must be string
- Object value = ArrayData.get(valueArray, i, valueType);
+ Object value = valueGetter.getElementOrNull(valueArray, i);
node.set(fieldName, valueConverter.convert(mapper, node.get(fieldName), value));
}