[FLINK-35641] ParquetSchemaConverter supports required fields (#24956)
diff --git a/docs/content/docs/connectors/table/formats/parquet.md b/docs/content/docs/connectors/table/formats/parquet.md
index 2b543c0..c5f7c42 100644
--- a/docs/content/docs/connectors/table/formats/parquet.md
+++ b/docs/content/docs/connectors/table/formats/parquet.md
@@ -121,6 +121,7 @@
<th class="text-left">Flink Data Type</th>
<th class="text-center">Parquet type</th>
<th class="text-center">Parquet logical type</th>
+ <th class="text-center">Limitations</th>
</tr>
</thead>
<tbody>
@@ -128,81 +129,103 @@
<td>CHAR / VARCHAR / STRING</td>
<td>BINARY</td>
<td>UTF8</td>
+ <td></td>
</tr>
<tr>
<td>BOOLEAN</td>
<td>BOOLEAN</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>BINARY / VARBINARY</td>
<td>BINARY</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>DECIMAL</td>
<td>FIXED_LEN_BYTE_ARRAY</td>
<td>DECIMAL</td>
+ <td></td>
</tr>
<tr>
<td>TINYINT</td>
<td>INT32</td>
<td>INT_8</td>
+ <td></td>
</tr>
<tr>
<td>SMALLINT</td>
<td>INT32</td>
<td>INT_16</td>
+ <td></td>
</tr>
<tr>
<td>INT</td>
<td>INT32</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>BIGINT</td>
<td>INT64</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>FLOAT</td>
<td>FLOAT</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>DOUBLE</td>
<td>DOUBLE</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>DATE</td>
<td>INT32</td>
<td>DATE</td>
+ <td></td>
</tr>
<tr>
<td>TIME</td>
<td>INT32</td>
<td>TIME_MILLIS</td>
+ <td></td>
</tr>
<tr>
<td>TIMESTAMP</td>
<td>INT96 (or INT64)</td>
<td></td>
+ <td></td>
</tr>
<tr>
<td>ARRAY</td>
<td>-</td>
<td>LIST</td>
+ <td></td>
</tr>
<tr>
<td>MAP</td>
<td>-</td>
<td>MAP</td>
+ <td>[Parquet does not support nullable map keys](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps)</td>
+ </tr>
+ <tr>
+ <td>MULTISET</td>
+ <td>-</td>
+ <td>MAP</td>
+ <td>[Parquet does not support nullable map keys](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps)</td>
</tr>
<tr>
<td>ROW</td>
<td>-</td>
<td>STRUCT</td>
+ <td></td>
</tr>
</tbody>
</table>
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
index d2d51d3..28ae2d8 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
@@ -404,6 +404,9 @@
recordConsumer.startField(keyName, 0);
keyWriter.write(keyArray, i);
recordConsumer.endField(keyName, 0);
+ } else {
+ throw new IllegalArgumentException(
+ "Parquet does not support null keys in maps. See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for more details.");
}
if (!valueArray.isNullAt(i)) {
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
index b7c91ee..0101fc1 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
@@ -61,7 +61,9 @@
}
public static Type convertToParquetType(String name, LogicalType type, Configuration conf) {
- return convertToParquetType(name, type, Type.Repetition.OPTIONAL, conf);
+ Type.Repetition repetition =
+ type.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED;
+ return convertToParquetType(name, type, repetition, conf);
}
private static Type convertToParquetType(
@@ -143,19 +145,31 @@
convertToParquetType(LIST_ELEMENT_NAME, arrayType.getElementType(), conf));
case MAP:
MapType mapType = (MapType) type;
+ LogicalType keyType = mapType.getKeyType();
+ if (keyType.isNullable()) {
+ // key is nullable, but Parquet does not support nullable keys, so we configure
+ // it as not nullable
+ keyType = keyType.copy(false);
+ }
return ConversionPatterns.mapType(
repetition,
name,
MAP_REPEATED_NAME,
- convertToParquetType("key", mapType.getKeyType(), conf),
+ convertToParquetType("key", keyType, conf),
convertToParquetType("value", mapType.getValueType(), conf));
case MULTISET:
MultisetType multisetType = (MultisetType) type;
+ LogicalType elementType = multisetType.getElementType();
+ if (elementType.isNullable()) {
+ // element type is nullable, but Parquet does not support nullable map keys,
+ // so we configure it as not nullable
+ elementType = elementType.copy(false);
+ }
return ConversionPatterns.mapType(
repetition,
name,
MAP_REPEATED_NAME,
- convertToParquetType("key", multisetType.getElementType(), conf),
+ convertToParquetType("key", elementType, conf),
convertToParquetType("value", new IntType(false), conf));
case ROW:
RowType rowType = (RowType) type;
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
index 27f7d2a..cd0bf00 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
@@ -72,6 +72,7 @@
import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.TIMESTAMP_TIME_UNIT;
import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.WRITE_INT64_TIMESTAMP;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link ParquetRowDataBuilder} and {@link ParquetRowDataWriter}. */
class ParquetRowDataWriterTest {
@@ -96,11 +97,16 @@
RowType.of(
new ArrayType(true, new IntType()),
new MapType(
- true,
new VarCharType(VarCharType.MAX_LENGTH),
new VarCharType(VarCharType.MAX_LENGTH)),
RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));
+ private static final RowType MAP_ROW_TYPE =
+ RowType.of(
+ new MapType(
+ new VarCharType(true, VarCharType.MAX_LENGTH),
+ new VarCharType(VarCharType.MAX_LENGTH)));
+
private static final RowType NESTED_ARRAY_MAP_TYPE =
RowType.of(
new IntType(),
@@ -109,7 +115,7 @@
true,
new MapType(
true,
- new VarCharType(VarCharType.MAX_LENGTH),
+ new VarCharType(false, VarCharType.MAX_LENGTH),
new VarCharType(VarCharType.MAX_LENGTH))));
private static final RowType NESTED_ARRAY_ROW_TYPE =
@@ -126,6 +132,11 @@
TypeConversions.fromLogicalToDataType(ROW_TYPE));
@SuppressWarnings("unchecked")
+ private static final DataFormatConverters.DataFormatConverter<RowData, Row> MAP_CONVERTER =
+ DataFormatConverters.getConverterForDataType(
+ TypeConversions.fromLogicalToDataType(MAP_ROW_TYPE));
+
+ @SuppressWarnings("unchecked")
private static final DataFormatConverters.DataFormatConverter<RowData, Row>
NESTED_ARRAY_MAP_CONVERTER =
DataFormatConverters.getConverterForDataType(
@@ -148,6 +159,8 @@
nestedArrayAndMapTest(folder, conf, false);
nestedArrayAndRowTest(folder, conf, true);
nestedArrayAndRowTest(folder, conf, false);
+ invalidTypeTest(folder, conf, true);
+ invalidTypeTest(folder, conf, false);
}
@Test
@@ -162,6 +175,8 @@
nestedArrayAndMapTest(folder, conf, false);
nestedArrayAndRowTest(folder, conf, true);
nestedArrayAndRowTest(folder, conf, false);
+ invalidTypeTest(folder, conf, true);
+ invalidTypeTest(folder, conf, false);
}
@Test
@@ -173,6 +188,8 @@
innerTest(folder, conf, false);
complexTypeTest(folder, conf, true);
complexTypeTest(folder, conf, false);
+ invalidTypeTest(folder, conf, true);
+ invalidTypeTest(folder, conf, false);
}
private void innerTest(java.nio.file.Path folder, Configuration conf, boolean utcTimestamp)
@@ -241,7 +258,6 @@
List<Row> rows = new ArrayList<>(number);
Map<String, String> mapData = new HashMap<>();
mapData.put("k1", "v1");
- mapData.put(null, "v2");
mapData.put("k2", null);
for (int i = 0; i < number; i++) {
@@ -264,6 +280,22 @@
assertThat(fileContent).isEqualTo(rows);
}
+ public void invalidTypeTest(java.nio.file.Path folder, Configuration conf, boolean utcTimestamp)
+ throws IOException {
+ Path path = new Path(folder.toString(), UUID.randomUUID().toString());
+ ParquetWriterFactory<RowData> factory =
+ ParquetRowDataBuilder.createWriterFactory(MAP_ROW_TYPE, conf, utcTimestamp);
+ final BulkWriter<RowData> rowDataBulkWriter =
+ factory.create(path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE));
+ Map<String, String> mapData = new HashMap<>();
+ mapData.put(null, "v1");
+ final Row row = Row.of(mapData);
+ assertThatThrownBy(
+ () -> rowDataBulkWriter.addElement(MAP_CONVERTER.toInternal(row)),
+ "Parquet does not support null keys in a map. See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for more details.")
+ .isInstanceOf(RuntimeException.class);
+ }
+
public void nestedArrayAndMapTest(
java.nio.file.Path folder, Configuration conf, boolean utcTimestamp) throws Exception {
Path path = new Path(folder.toString(), UUID.randomUUID().toString());
@@ -273,7 +305,6 @@
for (int i = 0; i < number; i++) {
Integer v = i;
Map<String, String> mp1 = new HashMap<>();
- mp1.put(null, "val_" + i);
Map<String, String> mp2 = new HashMap<>();
mp2.put("key_" + i, null);
mp2.put("key@" + i, "val@" + i);