GH-3358: Option to set Thrift max message size (#3359)
diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
index d7a4c33..776fb45 100644
--- a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
+++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
@@ -45,6 +45,7 @@
import org.apache.parquet.format.event.TypedConsumer.I64Consumer;
import org.apache.parquet.format.event.TypedConsumer.StringConsumer;
import org.apache.thrift.TBase;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -59,6 +60,7 @@
public class Util {
private static final int INIT_MEM_ALLOC_ENCR_BUFFER = 100;
+ private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB
public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream to) throws IOException {
writeColumnIndex(columnIndex, to, null, null);
@@ -156,6 +158,15 @@
return read(from, new FileMetaData(), decryptor, AAD);
}
+ public static FileMetaData readFileMetaData(InputStream from, int maxMessageSize) throws IOException {
+ return readFileMetaData(from, null, null, maxMessageSize);
+ }
+
+ public static FileMetaData readFileMetaData(
+ InputStream from, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize) throws IOException {
+ return read(from, new FileMetaData(), decryptor, AAD, maxMessageSize);
+ }
+
public static void writeColumnMetaData(
ColumnMetaData columnMetaData, OutputStream to, BlockCipher.Encryptor encryptor, byte[] AAD)
throws IOException {
@@ -190,6 +201,18 @@
return md;
}
+ public static FileMetaData readFileMetaData(
+ InputStream from, boolean skipRowGroups, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize)
+ throws IOException {
+ FileMetaData md = new FileMetaData();
+ if (skipRowGroups) {
+ readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups, decryptor, AAD, maxMessageSize);
+ } else {
+ read(from, md, decryptor, AAD, maxMessageSize);
+ }
+ return md;
+ }
+
public static void writeFileCryptoMetaData(
org.apache.parquet.format.FileCryptoMetaData cryptoMetadata, OutputStream to) throws IOException {
write(cryptoMetadata, to, null, null);
@@ -293,6 +316,17 @@
BlockCipher.Decryptor decryptor,
byte[] AAD)
throws IOException {
+ readFileMetaData(input, consumer, skipRowGroups, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE);
+ }
+
+ public static void readFileMetaData(
+ final InputStream input,
+ final FileMetaDataConsumer consumer,
+ boolean skipRowGroups,
+ BlockCipher.Decryptor decryptor,
+ byte[] AAD,
+ int maxMessageSize)
+ throws IOException {
try {
DelegatingFieldConsumer eventConsumer = fieldConsumer()
.onField(VERSION, new I32Consumer() {
@@ -358,26 +392,54 @@
byte[] plainText = decryptor.decrypt(input, AAD);
from = new ByteArrayInputStream(plainText);
}
- new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);
+ new EventBasedThriftReader(protocol(from, maxMessageSize)).readStruct(eventConsumer);
} catch (TException e) {
throw new IOException("can not read FileMetaData: " + e.getMessage(), e);
}
}
private static TProtocol protocol(OutputStream to) throws TTransportException {
- return protocol(new TIOStreamTransport(to));
+ return protocol(new TIOStreamTransport(to), DEFAULT_MAX_MESSAGE_SIZE);
}
private static TProtocol protocol(InputStream from) throws TTransportException {
- return protocol(new TIOStreamTransport(from));
+ return protocol(new TIOStreamTransport(from), DEFAULT_MAX_MESSAGE_SIZE);
}
- private static InterningProtocol protocol(TIOStreamTransport t) {
+ private static TProtocol protocol(InputStream from, int maxMessageSize) throws TTransportException {
+ return protocol(new TIOStreamTransport(from), maxMessageSize);
+ }
+
+ private static InterningProtocol protocol(TIOStreamTransport t, int configuredMaxMessageSize)
+ throws TTransportException, NumberFormatException {
+ int maxMessageSize = configuredMaxMessageSize;
+ if (configuredMaxMessageSize == -1) {
+ // Set to default 100 MB
+ maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
+ }
+ if (configuredMaxMessageSize <= 0) {
+ throw new NumberFormatException("Max message size must be positive: " + configuredMaxMessageSize);
+ }
+
+ TConfiguration config = t.getConfiguration();
+ config.setMaxMessageSize(maxMessageSize);
+ /*
+ Reset known message size to 0 to force checking against the max message size.
+ This is necessary when reusing the same transport for multiple reads/writes,
+ as the known message size may be larger than the max message size.
+ */
+ t.updateKnownMessageSize(0);
return new InterningProtocol(new TCompactProtocol(t));
}
private static <T extends TBase<?, ?>> T read(
final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+ return read(input, tbase, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE);
+ }
+
+ private static <T extends TBase<?, ?>> T read(
+ final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize)
+ throws IOException {
final InputStream from;
if (null == decryptor) {
from = input;
@@ -387,7 +449,7 @@
}
try {
- tbase.read(protocol(from));
+ tbase.read(protocol(from, maxMessageSize));
return tbase;
} catch (TException e) {
throw new IOException("can not read " + tbase.getClass() + ": " + e.getMessage(), e);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 10728df..002028c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -147,6 +147,15 @@
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k
+ /**
+ * Configuration property to control the Thrift max message size when reading Parquet metadata.
+ * This is useful for files with very large metadata
+ * Default value is 100 MB.
+ */
+ public static final String PARQUET_THRIFT_STRING_SIZE_LIMIT = "parquet.thrift.string.size.limit";
+
+ private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB
+
private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class);
private static final LogicalTypeConverterVisitor LOGICAL_TYPE_ANNOTATION_VISITOR =
new LogicalTypeConverterVisitor();
@@ -154,6 +163,7 @@
new ConvertedTypeConverterVisitor();
private final int statisticsTruncateLength;
private final boolean useSignedStringMinMax;
+ private final ParquetReadOptions options;
public ParquetMetadataConverter() {
this(false);
@@ -173,7 +183,7 @@
}
public ParquetMetadataConverter(ParquetReadOptions options) {
- this(options.useSignedStringMinMax());
+ this(options.useSignedStringMinMax(), ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, options);
}
private ParquetMetadataConverter(boolean useSignedStringMinMax) {
@@ -181,11 +191,30 @@
}
private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength) {
+ this(useSignedStringMinMax, statisticsTruncateLength, null);
+ }
+
+ private ParquetMetadataConverter(
+ boolean useSignedStringMinMax, int statisticsTruncateLength, ParquetReadOptions options) {
if (statisticsTruncateLength <= 0) {
throw new IllegalArgumentException("Truncate length should be greater than 0");
}
this.useSignedStringMinMax = useSignedStringMinMax;
this.statisticsTruncateLength = statisticsTruncateLength;
+ this.options = options;
+ }
+
+ /**
+ * Gets the configured max message size for Thrift deserialization.
+ * Reads from ParquetReadOptions configuration, or returns -1 if not available.
+ *
+ * @return the max message size in bytes, or -1 to use the default
+ */
+ private int getMaxMessageSize() {
+ if (options != null && options.getConfiguration() != null) {
+ return options.getConfiguration().getInt(PARQUET_THRIFT_STRING_SIZE_LIMIT, DEFAULT_MAX_MESSAGE_SIZE);
+ }
+ return -1;
}
// NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate
@@ -1694,21 +1723,27 @@
filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {
- FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+ int maxMessageSize = getMaxMessageSize();
+ FileMetaData fileMetadata =
+ readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
return new FileMetaDataAndRowGroupOffsetInfo(
fileMetadata, generateRowGroupOffsets(fileMetadata));
}
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {
- FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
+ int maxMessageSize = getMaxMessageSize();
+ FileMetaData fileMetadata =
+ readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD, maxMessageSize);
return new FileMetaDataAndRowGroupOffsetInfo(
fileMetadata, generateRowGroupOffsets(fileMetadata));
}
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {
- FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+ int maxMessageSize = getMaxMessageSize();
+ FileMetaData fileMetadata =
+ readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
// We must generate the map *before* filtering because it modifies `fileMetadata`.
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);
@@ -1717,7 +1752,9 @@
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {
- FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+ int maxMessageSize = getMaxMessageSize();
+ FileMetaData fileMetadata =
+ readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
// We must generate the map *before* filtering because it modifies `fileMetadata`.
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java
new file mode 100644
index 0000000..f9f121b
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestParquetFileReaderMaxMessageSize {
+
+ public static Path TEST_FILE;
+ public MessageType schema;
+
+ @Rule
+ public final TemporaryFolder temp = new TemporaryFolder();
+
+ @Before
+ public void testSetup() throws IOException {
+
+ File testParquetFile = temp.newFile();
+ testParquetFile.delete();
+
+ TEST_FILE = new Path(testParquetFile.toURI());
+ // Create a file with many columns
+ StringBuilder schemaBuilder = new StringBuilder("message test_schema {");
+ for (int i = 0; i < 2000; i++) {
+ schemaBuilder.append("required int64 col_").append(i).append(";");
+ }
+ schemaBuilder.append("}");
+
+ schema = MessageTypeParser.parseMessageType(schemaBuilder.toString());
+
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(HadoopOutputFile.fromPath(TEST_FILE, conf))
+ .withConf(conf)
+ .withType(schema)
+ .build()) {
+
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ Group group = factory.newGroup();
+ for (int col = 0; col < 2000; col++) {
+ group.append("col_" + col, 1L);
+ }
+ writer.write(group);
+ }
+ }
+
+ /**
+ * Test reading a file with many columns using custom max message size
+ */
+ @Test
+ public void testReadFileWithManyColumns() throws IOException {
+ Configuration readConf = new Configuration();
+ readConf.setInt("parquet.thrift.string.size.limit", 200 * 1024 * 1024);
+
+ ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
+
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
+
+ ParquetMetadata metadata = reader.getFooter();
+ assertNotNull(metadata);
+ assertEquals(schema, metadata.getFileMetaData().getSchema());
+ assertTrue(metadata.getBlocks().size() > 0);
+ }
+ }
+
+ /**
+ * Test that default configuration works for normal files
+ */
+ @Test
+ public void testReadNormalFileWithDefaultConfig() throws IOException {
+ // Read with default configuration (no custom max message size)
+ Configuration readConf = new Configuration();
+ ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
+
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
+
+ ParquetMetadata metadata = reader.getFooter();
+ assertNotNull(metadata);
+ assertEquals(1, metadata.getBlocks().get(0).getRowCount());
+ }
+ }
+
+ /**
+ * Test that insufficient max message size produces error
+ */
+ @Test
+ public void testInsufficientMaxMessageSizeError() throws IOException {
+ // Try to read with very small max message size
+ Configuration readConf = new Configuration();
+ readConf.setInt("parquet.thrift.string.size.limit", 1); // Only 1 byte
+
+ ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
+
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
+ fail("Should have thrown Message size exceeds limit due to MaxMessageSize");
+ } catch (IOException e) {
+ e.printStackTrace();
+ assertTrue(
+ "Error should mention TTransportException",
+ e.getMessage().contains("Message size exceeds limit")
+ || e.getCause().getMessage().contains("Message size exceeds limit")
+ || e.getMessage().contains("MaxMessageSize reached")
+ || e.getCause().getMessage().contains("MaxMessageSize reached"));
+ }
+ }
+}