GH-3358: Option to set Thrift max message size (#3359)

diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
index d7a4c33..776fb45 100644
--- a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
+++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
@@ -45,6 +45,7 @@
 import org.apache.parquet.format.event.TypedConsumer.I64Consumer;
 import org.apache.parquet.format.event.TypedConsumer.StringConsumer;
 import org.apache.thrift.TBase;
+import org.apache.thrift.TConfiguration;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -59,6 +60,7 @@
 public class Util {
 
   private static final int INIT_MEM_ALLOC_ENCR_BUFFER = 100;
+  private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB
 
   public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream to) throws IOException {
     writeColumnIndex(columnIndex, to, null, null);
@@ -156,6 +158,15 @@
     return read(from, new FileMetaData(), decryptor, AAD);
   }
 
+  public static FileMetaData readFileMetaData(InputStream from, int maxMessageSize) throws IOException {
+    return readFileMetaData(from, null, null, maxMessageSize);
+  }
+
+  public static FileMetaData readFileMetaData(
+      InputStream from, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize) throws IOException {
+    return read(from, new FileMetaData(), decryptor, AAD, maxMessageSize);
+  }
+
   public static void writeColumnMetaData(
       ColumnMetaData columnMetaData, OutputStream to, BlockCipher.Encryptor encryptor, byte[] AAD)
       throws IOException {
@@ -190,6 +201,18 @@
     return md;
   }
 
+  public static FileMetaData readFileMetaData(
+      InputStream from, boolean skipRowGroups, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize)
+      throws IOException {
+    FileMetaData md = new FileMetaData();
+    if (skipRowGroups) {
+      readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups, decryptor, AAD, maxMessageSize);
+    } else {
+      read(from, md, decryptor, AAD, maxMessageSize);
+    }
+    return md;
+  }
+
   public static void writeFileCryptoMetaData(
       org.apache.parquet.format.FileCryptoMetaData cryptoMetadata, OutputStream to) throws IOException {
     write(cryptoMetadata, to, null, null);
@@ -293,6 +316,17 @@
       BlockCipher.Decryptor decryptor,
       byte[] AAD)
       throws IOException {
+    readFileMetaData(input, consumer, skipRowGroups, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE);
+  }
+
+  public static void readFileMetaData(
+      final InputStream input,
+      final FileMetaDataConsumer consumer,
+      boolean skipRowGroups,
+      BlockCipher.Decryptor decryptor,
+      byte[] AAD,
+      int maxMessageSize)
+      throws IOException {
     try {
       DelegatingFieldConsumer eventConsumer = fieldConsumer()
           .onField(VERSION, new I32Consumer() {
@@ -358,26 +392,54 @@
         byte[] plainText = decryptor.decrypt(input, AAD);
         from = new ByteArrayInputStream(plainText);
       }
-      new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);
+      new EventBasedThriftReader(protocol(from, maxMessageSize)).readStruct(eventConsumer);
     } catch (TException e) {
       throw new IOException("can not read FileMetaData: " + e.getMessage(), e);
     }
   }
 
   private static TProtocol protocol(OutputStream to) throws TTransportException {
-    return protocol(new TIOStreamTransport(to));
+    return protocol(new TIOStreamTransport(to), DEFAULT_MAX_MESSAGE_SIZE);
   }
 
   private static TProtocol protocol(InputStream from) throws TTransportException {
-    return protocol(new TIOStreamTransport(from));
+    return protocol(new TIOStreamTransport(from), DEFAULT_MAX_MESSAGE_SIZE);
   }
 
-  private static InterningProtocol protocol(TIOStreamTransport t) {
+  private static TProtocol protocol(InputStream from, int maxMessageSize) throws TTransportException {
+    return protocol(new TIOStreamTransport(from), maxMessageSize);
+  }
+
+  private static InterningProtocol protocol(TIOStreamTransport t, int configuredMaxMessageSize)
+      throws TTransportException, NumberFormatException {
+    int maxMessageSize = configuredMaxMessageSize;
+    if (configuredMaxMessageSize == -1) {
+      // Set to default 100 MB
+      maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
+    }
+    if (configuredMaxMessageSize <= 0) {
+      throw new NumberFormatException("Max message size must be positive: " + configuredMaxMessageSize);
+    }
+
+    TConfiguration config = t.getConfiguration();
+    config.setMaxMessageSize(maxMessageSize);
+    /*
+    Reset known message size to 0 to force checking against the max message size.
+    This is necessary when reusing the same transport for multiple reads/writes,
+    as the known message size may be larger than the max message size.
+    */
+    t.updateKnownMessageSize(0);
     return new InterningProtocol(new TCompactProtocol(t));
   }
 
   private static <T extends TBase<?, ?>> T read(
       final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+    return read(input, tbase, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE);
+  }
+
+  private static <T extends TBase<?, ?>> T read(
+      final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize)
+      throws IOException {
     final InputStream from;
     if (null == decryptor) {
       from = input;
@@ -387,7 +449,7 @@
     }
 
     try {
-      tbase.read(protocol(from));
+      tbase.read(protocol(from, maxMessageSize));
       return tbase;
     } catch (TException e) {
       throw new IOException("can not read " + tbase.getClass() + ": " + e.getMessage(), e);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 10728df..002028c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -147,6 +147,15 @@
   public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
   public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k
 
+  /**
+   * Configuration property to control the Thrift max message size when reading Parquet metadata.
+   * This is useful for files with very large metadata
+   * Default value is 100 MB.
+   */
+  public static final String PARQUET_THRIFT_STRING_SIZE_LIMIT = "parquet.thrift.string.size.limit";
+
+  private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB
+
   private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class);
   private static final LogicalTypeConverterVisitor LOGICAL_TYPE_ANNOTATION_VISITOR =
       new LogicalTypeConverterVisitor();
@@ -154,6 +163,7 @@
       new ConvertedTypeConverterVisitor();
   private final int statisticsTruncateLength;
   private final boolean useSignedStringMinMax;
+  private final ParquetReadOptions options;
 
   public ParquetMetadataConverter() {
     this(false);
@@ -173,7 +183,7 @@
   }
 
   public ParquetMetadataConverter(ParquetReadOptions options) {
-    this(options.useSignedStringMinMax());
+    this(options.useSignedStringMinMax(), ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, options);
   }
 
   private ParquetMetadataConverter(boolean useSignedStringMinMax) {
@@ -181,11 +191,30 @@
   }
 
   private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength) {
+    this(useSignedStringMinMax, statisticsTruncateLength, null);
+  }
+
+  private ParquetMetadataConverter(
+      boolean useSignedStringMinMax, int statisticsTruncateLength, ParquetReadOptions options) {
     if (statisticsTruncateLength <= 0) {
       throw new IllegalArgumentException("Truncate length should be greater than 0");
     }
     this.useSignedStringMinMax = useSignedStringMinMax;
     this.statisticsTruncateLength = statisticsTruncateLength;
+    this.options = options;
+  }
+
+  /**
+   * Gets the configured max message size for Thrift deserialization.
+   * Reads from ParquetReadOptions configuration, or returns -1 if not available.
+   *
+   * @return the max message size in bytes, or -1 to use the default
+   */
+  private int getMaxMessageSize() {
+    if (options != null && options.getConfiguration() != null) {
+      return options.getConfiguration().getInt(PARQUET_THRIFT_STRING_SIZE_LIMIT, DEFAULT_MAX_MESSAGE_SIZE);
+    }
+    return -1;
   }
 
   // NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate
@@ -1694,21 +1723,27 @@
         filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {
           @Override
           public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {
-            FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+            int maxMessageSize = getMaxMessageSize();
+            FileMetaData fileMetadata =
+                readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
             return new FileMetaDataAndRowGroupOffsetInfo(
                 fileMetadata, generateRowGroupOffsets(fileMetadata));
           }
 
           @Override
           public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {
-            FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
+            int maxMessageSize = getMaxMessageSize();
+            FileMetaData fileMetadata =
+                readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD, maxMessageSize);
             return new FileMetaDataAndRowGroupOffsetInfo(
                 fileMetadata, generateRowGroupOffsets(fileMetadata));
           }
 
           @Override
           public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {
-            FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+            int maxMessageSize = getMaxMessageSize();
+            FileMetaData fileMetadata =
+                readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
             // We must generate the map *before* filtering because it modifies `fileMetadata`.
             Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
             FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);
@@ -1717,7 +1752,9 @@
 
           @Override
           public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {
-            FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+            int maxMessageSize = getMaxMessageSize();
+            FileMetaData fileMetadata =
+                readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
             // We must generate the map *before* filtering because it modifies `fileMetadata`.
             Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
             FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java
new file mode 100644
index 0000000..f9f121b
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestParquetFileReaderMaxMessageSize {
+
+  public static Path TEST_FILE;
+  public MessageType schema;
+
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void testSetup() throws IOException {
+
+    File testParquetFile = temp.newFile();
+    testParquetFile.delete();
+
+    TEST_FILE = new Path(testParquetFile.toURI());
+    // Create a file with many columns
+    StringBuilder schemaBuilder = new StringBuilder("message test_schema {");
+    for (int i = 0; i < 2000; i++) {
+      schemaBuilder.append("required int64 col_").append(i).append(";");
+    }
+    schemaBuilder.append("}");
+
+    schema = MessageTypeParser.parseMessageType(schemaBuilder.toString());
+
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(HadoopOutputFile.fromPath(TEST_FILE, conf))
+        .withConf(conf)
+        .withType(schema)
+        .build()) {
+
+      SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+      Group group = factory.newGroup();
+      for (int col = 0; col < 2000; col++) {
+        group.append("col_" + col, 1L);
+      }
+      writer.write(group);
+    }
+  }
+
+  /**
+   * Test reading a file with many columns using custom max message size
+   */
+  @Test
+  public void testReadFileWithManyColumns() throws IOException {
+    Configuration readConf = new Configuration();
+    readConf.setInt("parquet.thrift.string.size.limit", 200 * 1024 * 1024);
+
+    ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
+
+    try (ParquetFileReader reader =
+        ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
+
+      ParquetMetadata metadata = reader.getFooter();
+      assertNotNull(metadata);
+      assertEquals(schema, metadata.getFileMetaData().getSchema());
+      assertTrue(metadata.getBlocks().size() > 0);
+    }
+  }
+
+  /**
+   * Test that default configuration works for normal files
+   */
+  @Test
+  public void testReadNormalFileWithDefaultConfig() throws IOException {
+    // Read with default configuration (no custom max message size)
+    Configuration readConf = new Configuration();
+    ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
+
+    try (ParquetFileReader reader =
+        ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
+
+      ParquetMetadata metadata = reader.getFooter();
+      assertNotNull(metadata);
+      assertEquals(1, metadata.getBlocks().get(0).getRowCount());
+    }
+  }
+
+  /**
+   * Test that insufficient max message size produces error
+   */
+  @Test
+  public void testInsufficientMaxMessageSizeError() throws IOException {
+    // Try to read with very small max message size
+    Configuration readConf = new Configuration();
+    readConf.setInt("parquet.thrift.string.size.limit", 1); // Only 1 byte
+
+    ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
+
+    try (ParquetFileReader reader =
+        ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
+      fail("Should have thrown Message size exceeds limit due to MaxMessageSize");
+    } catch (IOException e) {
+      e.printStackTrace();
+      assertTrue(
+          "Error should mention TTransportException",
+          e.getMessage().contains("Message size exceeds limit")
+              || e.getCause().getMessage().contains("Message size exceeds limit")
+              || e.getMessage().contains("MaxMessageSize reached")
+              || e.getCause().getMessage().contains("MaxMessageSize reached"));
+    }
+  }
+}