[FLINK-35641] ParquetSchemaConverter supports required fields (#24956)

diff --git a/docs/content/docs/connectors/table/formats/parquet.md b/docs/content/docs/connectors/table/formats/parquet.md
index 2b543c0..c5f7c42 100644
--- a/docs/content/docs/connectors/table/formats/parquet.md
+++ b/docs/content/docs/connectors/table/formats/parquet.md
@@ -121,6 +121,7 @@
         <th class="text-left">Flink Data Type</th>
         <th class="text-center">Parquet type</th>
         <th class="text-center">Parquet logical type</th>
+        <th class="text-center">Limitations</th>
       </tr>
     </thead>
     <tbody>
@@ -128,81 +129,103 @@
       <td>CHAR / VARCHAR / STRING</td>
       <td>BINARY</td>
       <td>UTF8</td>
+      <td></td>
     </tr>
     <tr>
       <td>BOOLEAN</td>
       <td>BOOLEAN</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>BINARY / VARBINARY</td>
       <td>BINARY</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>DECIMAL</td>
       <td>FIXED_LEN_BYTE_ARRAY</td>
       <td>DECIMAL</td>
+      <td></td>
     </tr>
     <tr>
       <td>TINYINT</td>
       <td>INT32</td>
       <td>INT_8</td>
+      <td></td>
     </tr>
     <tr>
       <td>SMALLINT</td>
       <td>INT32</td>
       <td>INT_16</td>
+      <td></td>
     </tr>
     <tr>
       <td>INT</td>
       <td>INT32</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>BIGINT</td>
       <td>INT64</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>FLOAT</td>
       <td>FLOAT</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>DOUBLE</td>
       <td>DOUBLE</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>DATE</td>
       <td>INT32</td>
       <td>DATE</td>
+      <td></td>
     </tr>
     <tr>
       <td>TIME</td>
       <td>INT32</td>
       <td>TIME_MILLIS</td>
+      <td></td>
     </tr>
     <tr>
       <td>TIMESTAMP</td>
       <td>INT96 (or INT64)</td>
       <td></td>
+      <td></td>
     </tr>
     <tr>
       <td>ARRAY</td>
       <td>-</td>
       <td>LIST</td>
+      <td></td>
     </tr>
     <tr>
       <td>MAP</td>
       <td>-</td>
       <td>MAP</td>
+      <td>[Parquet does not support nullable map keys](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps)</td>
+    </tr>
+    <tr>
+      <td>MULTISET</td>
+      <td>-</td>
+      <td>MAP</td>
+      <td>[Parquet does not support nullable map keys](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps)</td>
     </tr>
     <tr>
       <td>ROW</td>
       <td>-</td>
       <td>STRUCT</td>
+      <td></td>
     </tr>
     </tbody>
 </table>
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
index d2d51d3..28ae2d8 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
@@ -404,6 +404,9 @@
                         recordConsumer.startField(keyName, 0);
                         keyWriter.write(keyArray, i);
                         recordConsumer.endField(keyName, 0);
+                    } else {
+                        throw new IllegalArgumentException(
+                                "Parquet does not support null keys in maps. See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for more details.");
                     }
 
                     if (!valueArray.isNullAt(i)) {
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
index b7c91ee..0101fc1 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
@@ -61,7 +61,9 @@
     }
 
     public static Type convertToParquetType(String name, LogicalType type, Configuration conf) {
-        return convertToParquetType(name, type, Type.Repetition.OPTIONAL, conf);
+        Type.Repetition repetition =
+                type.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED;
+        return convertToParquetType(name, type, repetition, conf);
     }
 
     private static Type convertToParquetType(
@@ -143,19 +145,31 @@
                         convertToParquetType(LIST_ELEMENT_NAME, arrayType.getElementType(), conf));
             case MAP:
                 MapType mapType = (MapType) type;
+                LogicalType keyType = mapType.getKeyType();
+                if (keyType.isNullable()) {
+                    // key is nullable, but Parquet does not support nullable keys, so we configure
+                    // it as not nullable
+                    keyType = keyType.copy(false);
+                }
                 return ConversionPatterns.mapType(
                         repetition,
                         name,
                         MAP_REPEATED_NAME,
-                        convertToParquetType("key", mapType.getKeyType(), conf),
+                        convertToParquetType("key", keyType, conf),
                         convertToParquetType("value", mapType.getValueType(), conf));
             case MULTISET:
                 MultisetType multisetType = (MultisetType) type;
+                LogicalType elementType = multisetType.getElementType();
+                if (elementType.isNullable()) {
+                    // element type is nullable, but Parquet does not support nullable map keys,
+                    // so we configure it as not nullable
+                    elementType = elementType.copy(false);
+                }
                 return ConversionPatterns.mapType(
                         repetition,
                         name,
                         MAP_REPEATED_NAME,
-                        convertToParquetType("key", multisetType.getElementType(), conf),
+                        convertToParquetType("key", elementType, conf),
                         convertToParquetType("value", new IntType(false), conf));
             case ROW:
                 RowType rowType = (RowType) type;
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
index 27f7d2a..cd0bf00 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
@@ -72,6 +72,7 @@
 import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.TIMESTAMP_TIME_UNIT;
 import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.WRITE_INT64_TIMESTAMP;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link ParquetRowDataBuilder} and {@link ParquetRowDataWriter}. */
 class ParquetRowDataWriterTest {
@@ -96,11 +97,16 @@
             RowType.of(
                     new ArrayType(true, new IntType()),
                     new MapType(
-                            true,
                             new VarCharType(VarCharType.MAX_LENGTH),
                             new VarCharType(VarCharType.MAX_LENGTH)),
                     RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));
 
+    private static final RowType MAP_ROW_TYPE =
+            RowType.of(
+                    new MapType(
+                            new VarCharType(true, VarCharType.MAX_LENGTH),
+                            new VarCharType(VarCharType.MAX_LENGTH)));
+
     private static final RowType NESTED_ARRAY_MAP_TYPE =
             RowType.of(
                     new IntType(),
@@ -109,7 +115,7 @@
                             true,
                             new MapType(
                                     true,
-                                    new VarCharType(VarCharType.MAX_LENGTH),
+                                    new VarCharType(false, VarCharType.MAX_LENGTH),
                                     new VarCharType(VarCharType.MAX_LENGTH))));
 
     private static final RowType NESTED_ARRAY_ROW_TYPE =
@@ -126,6 +132,11 @@
                     TypeConversions.fromLogicalToDataType(ROW_TYPE));
 
     @SuppressWarnings("unchecked")
+    private static final DataFormatConverters.DataFormatConverter<RowData, Row> MAP_CONVERTER =
+            DataFormatConverters.getConverterForDataType(
+                    TypeConversions.fromLogicalToDataType(MAP_ROW_TYPE));
+
+    @SuppressWarnings("unchecked")
     private static final DataFormatConverters.DataFormatConverter<RowData, Row>
             NESTED_ARRAY_MAP_CONVERTER =
                     DataFormatConverters.getConverterForDataType(
@@ -148,6 +159,8 @@
         nestedArrayAndMapTest(folder, conf, false);
         nestedArrayAndRowTest(folder, conf, true);
         nestedArrayAndRowTest(folder, conf, false);
+        invalidTypeTest(folder, conf, true);
+        invalidTypeTest(folder, conf, false);
     }
 
     @Test
@@ -162,6 +175,8 @@
         nestedArrayAndMapTest(folder, conf, false);
         nestedArrayAndRowTest(folder, conf, true);
         nestedArrayAndRowTest(folder, conf, false);
+        invalidTypeTest(folder, conf, true);
+        invalidTypeTest(folder, conf, false);
     }
 
     @Test
@@ -173,6 +188,8 @@
         innerTest(folder, conf, false);
         complexTypeTest(folder, conf, true);
         complexTypeTest(folder, conf, false);
+        invalidTypeTest(folder, conf, true);
+        invalidTypeTest(folder, conf, false);
     }
 
     private void innerTest(java.nio.file.Path folder, Configuration conf, boolean utcTimestamp)
@@ -241,7 +258,6 @@
         List<Row> rows = new ArrayList<>(number);
         Map<String, String> mapData = new HashMap<>();
         mapData.put("k1", "v1");
-        mapData.put(null, "v2");
         mapData.put("k2", null);
 
         for (int i = 0; i < number; i++) {
@@ -264,6 +280,22 @@
         assertThat(fileContent).isEqualTo(rows);
     }
 
+    public void invalidTypeTest(java.nio.file.Path folder, Configuration conf, boolean utcTimestamp)
+            throws IOException {
+        Path path = new Path(folder.toString(), UUID.randomUUID().toString());
+        ParquetWriterFactory<RowData> factory =
+                ParquetRowDataBuilder.createWriterFactory(MAP_ROW_TYPE, conf, utcTimestamp);
+        final BulkWriter<RowData> rowDataBulkWriter =
+                factory.create(path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE));
+        Map<String, String> mapData = new HashMap<>();
+        mapData.put(null, "v1");
+        final Row row = Row.of(mapData);
+        assertThatThrownBy(
+                        () -> rowDataBulkWriter.addElement(MAP_CONVERTER.toInternal(row)),
+                        "Parquet does not support null keys in a map. See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for more details.")
+                .isInstanceOf(RuntimeException.class);
+    }
+
     public void nestedArrayAndMapTest(
             java.nio.file.Path folder, Configuration conf, boolean utcTimestamp) throws Exception {
         Path path = new Path(folder.toString(), UUID.randomUUID().toString());
@@ -273,7 +305,6 @@
         for (int i = 0; i < number; i++) {
             Integer v = i;
             Map<String, String> mp1 = new HashMap<>();
-            mp1.put(null, "val_" + i);
             Map<String, String> mp2 = new HashMap<>();
             mp2.put("key_" + i, null);
             mp2.put("key@" + i, "val@" + i);