blob: 2e572944868b6431f9907e562955d8487cb56581 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::{
cmp::min,
io::{Cursor, Read, Seek, SeekFrom},
sync::Arc,
};
use byteorder::{ByteOrder, LittleEndian};
use parquet_format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
use thrift::protocol::TCompactInputProtocol;
use crate::basic::ColumnOrder;
use crate::errors::{ParquetError, Result};
use crate::file::{
metadata::*, reader::ChunkReader, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE,
PARQUET_MAGIC,
};
use crate::schema::types::{self, SchemaDescriptor};
/// Layout of Parquet file
/// +---------------------------+-----+---+
/// | Rest of file | B | A |
/// +---------------------------+-----+---+
/// where A: parquet footer, B: parquet metadata.
///
/// The reader first reads DEFAULT_FOOTER_SIZE bytes from the end of the file.
/// If it is not enough according to the length indicated in the footer, it reads more bytes.
pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaData> {
// check file is large enough to hold footer
let file_size = chunk_reader.len();
if file_size < (FOOTER_SIZE as u64) {
return Err(general_err!(
"Invalid Parquet file. Size is smaller than footer"
));
}
// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, chunk_reader.len() as usize);
let mut default_end_reader = chunk_reader
.get_read(chunk_reader.len() - default_end_len as u64, default_end_len)?;
let mut default_len_end_buf = vec![0; default_end_len];
default_end_reader.read_exact(&mut default_len_end_buf)?;
// check this is indeed a parquet file
if default_len_end_buf[default_end_len - 4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}
// get the metadata length from the footer
let metadata_len = LittleEndian::read_i32(
&default_len_end_buf[default_end_len - 8..default_end_len - 4],
) as i64;
if metadata_len < 0 {
return Err(general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
));
}
let footer_metadata_len = FOOTER_SIZE + metadata_len as usize;
// build up the reader covering the entire metadata
let mut default_end_cursor = Cursor::new(default_len_end_buf);
let metadata_read: Box<dyn Read>;
if footer_metadata_len > file_size as usize {
return Err(general_err!(
"Invalid Parquet file. Metadata start is less than zero ({})",
file_size as i64 - footer_metadata_len as i64
));
} else if footer_metadata_len < DEFAULT_FOOTER_READ_SIZE {
// the whole metadata is in the bytes we already read
default_end_cursor.seek(SeekFrom::End(-(footer_metadata_len as i64)))?;
metadata_read = Box::new(default_end_cursor);
} else {
// the end of file read by default is not long enough, read missing bytes
let complementary_end_read = chunk_reader.get_read(
file_size - footer_metadata_len as u64,
FOOTER_SIZE + metadata_len as usize - default_end_len,
)?;
metadata_read = Box::new(complementary_end_read.chain(default_end_cursor));
}
// TODO: row group filtering
let mut prot = TCompactInputProtocol::new(metadata_read);
let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
.map_err(|e| ParquetError::General(format!("Could not parse metadata: {}", e)))?;
let schema = types::from_thrift(&t_file_metadata.schema)?;
let schema_descr = Arc::new(SchemaDescriptor::new(schema));
let mut row_groups = Vec::new();
for rg in t_file_metadata.row_groups {
row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
}
let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr);
let file_metadata = FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
t_file_metadata.key_value_metadata,
schema_descr,
column_orders,
);
Ok(ParquetMetaData::new(file_metadata, row_groups))
}
/// Parses column orders from Thrift definition.
/// If no column orders are defined, returns `None`.
fn parse_column_orders(
t_column_orders: Option<Vec<TColumnOrder>>,
schema_descr: &SchemaDescriptor,
) -> Option<Vec<ColumnOrder>> {
match t_column_orders {
Some(orders) => {
// Should always be the case
assert_eq!(
orders.len(),
schema_descr.num_columns(),
"Column order length mismatch"
);
let mut res = Vec::new();
for (i, column) in schema_descr.columns().iter().enumerate() {
match orders[i] {
TColumnOrder::TYPEORDER(_) => {
let sort_order = ColumnOrder::get_sort_order(
column.logical_type(),
column.converted_type(),
column.physical_type(),
);
res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
}
}
}
Some(res)
}
None => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::basic::SortOrder;
use crate::basic::Type;
use crate::schema::types::Type as SchemaType;
use crate::util::test_common::get_temp_file;
use parquet_format::TypeDefinedOrder;
#[test]
fn test_parse_metadata_size_smaller_than_footer() {
let test_file = get_temp_file("corrupt-1.parquet", &[]);
let reader_result = parse_metadata(&test_file);
assert!(reader_result.is_err());
assert_eq!(
reader_result.err().unwrap(),
general_err!("Invalid Parquet file. Size is smaller than footer")
);
}
#[test]
fn test_parse_metadata_corrupt_footer() {
let test_file = get_temp_file("corrupt-2.parquet", &[1, 2, 3, 4, 5, 6, 7, 8]);
let reader_result = parse_metadata(&test_file);
assert!(reader_result.is_err());
assert_eq!(
reader_result.err().unwrap(),
general_err!("Invalid Parquet file. Corrupt footer")
);
}
#[test]
fn test_parse_metadata_invalid_length() {
let test_file =
get_temp_file("corrupt-3.parquet", &[0, 0, 0, 255, b'P', b'A', b'R', b'1']);
let reader_result = parse_metadata(&test_file);
assert!(reader_result.is_err());
assert_eq!(
reader_result.err().unwrap(),
general_err!(
"Invalid Parquet file. Metadata length is less than zero (-16777216)"
)
);
}
#[test]
fn test_parse_metadata_invalid_start() {
let test_file =
get_temp_file("corrupt-4.parquet", &[255, 0, 0, 0, b'P', b'A', b'R', b'1']);
let reader_result = parse_metadata(&test_file);
assert!(reader_result.is_err());
assert_eq!(
reader_result.err().unwrap(),
general_err!("Invalid Parquet file. Metadata start is less than zero (-255)")
);
}
#[test]
fn test_metadata_column_orders_parse() {
// Define simple schema, we do not need to provide logical types.
let mut fields = vec![
Arc::new(
SchemaType::primitive_type_builder("col1", Type::INT32)
.build()
.unwrap(),
),
Arc::new(
SchemaType::primitive_type_builder("col2", Type::FLOAT)
.build()
.unwrap(),
),
];
let schema = SchemaType::group_type_builder("schema")
.with_fields(&mut fields)
.build()
.unwrap();
let schema_descr = SchemaDescriptor::new(Arc::new(schema));
let t_column_orders = Some(vec![
TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
]);
assert_eq!(
parse_column_orders(t_column_orders, &schema_descr),
Some(vec![
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
])
);
// Test when no column orders are defined.
assert_eq!(parse_column_orders(None, &schema_descr), None);
}
#[test]
#[should_panic(expected = "Column order length mismatch")]
fn test_metadata_column_orders_len_mismatch() {
let schema = SchemaType::group_type_builder("schema").build().unwrap();
let schema_descr = SchemaDescriptor::new(Arc::new(schema));
let t_column_orders =
Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
parse_column_orders(t_column_orders, &schema_descr);
}
}