blob: d23b12469e385fc0614f8bd80e52aa2e57a86b63 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Functions that are query-able and searchable via the `\h` command
use std::fmt;
use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;
use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use datafusion::catalog::{Session, TableFunctionImpl};
use datafusion::common::{plan_err, Column};
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::cache::cache_manager::CacheManager;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;
use async_trait::async_trait;
use parquet::basic::ConvertedType;
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics;
#[derive(Debug)]
pub enum Function {
Select,
Explain,
Show,
CreateTable,
CreateTableAs,
Insert,
DropTable,
}
const ALL_FUNCTIONS: [Function; 7] = [
Function::CreateTable,
Function::CreateTableAs,
Function::DropTable,
Function::Explain,
Function::Insert,
Function::Select,
Function::Show,
];
impl Function {
pub fn function_details(&self) -> Result<&str> {
let details = match self {
Function::Select => {
r#"
Command: SELECT
Description: retrieve rows from a table or view
Syntax:
SELECT [ ALL | DISTINCT [ ON ( expression [, ...] ) ] ]
[ * | expression [ [ AS ] output_name ] [, ...] ]
[ FROM from_item [, ...] ]
[ WHERE condition ]
[ GROUP BY [ ALL | DISTINCT ] grouping_element [, ...] ]
[ HAVING condition ]
[ WINDOW window_name AS ( window_definition ) [, ...] ]
[ { UNION | INTERSECT | EXCEPT } [ ALL | DISTINCT ] select ]
[ ORDER BY expression [ ASC | DESC | USING operator ] [ NULLS { FIRST | LAST } ] [, ...] ]
[ LIMIT { count | ALL } ]
[ OFFSET start [ ROW | ROWS ] ]
where from_item can be one of:
[ ONLY ] table_name [ * ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
[ TABLESAMPLE sampling_method ( argument [, ...] ) [ REPEATABLE ( seed ) ] ]
[ LATERAL ] ( select ) [ AS ] alias [ ( column_alias [, ...] ) ]
with_query_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
[ LATERAL ] function_name ( [ argument [, ...] ] )
[ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
[ LATERAL ] function_name ( [ argument [, ...] ] ) [ AS ] alias ( column_definition [, ...] )
[ LATERAL ] function_name ( [ argument [, ...] ] ) AS ( column_definition [, ...] )
[ LATERAL ] ROWS FROM( function_name ( [ argument [, ...] ] ) [ AS ( column_definition [, ...] ) ] [, ...] )
[ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
from_item [ NATURAL ] join_type from_item [ ON join_condition | USING ( join_column [, ...] ) [ AS join_using_alias ] ]
and grouping_element can be one of:
( )
expression
( expression [, ...] )
and with_query is:
with_query_name [ ( column_name [, ...] ) ] AS [ [ NOT ] MATERIALIZED ] ( select | values | insert | update | delete )
TABLE [ ONLY ] table_name [ * ]"#
}
Function::Explain => {
r#"
Command: EXPLAIN
Description: show the execution plan of a statement
Syntax:
EXPLAIN [ ANALYZE ] statement
"#
}
Function::Show => {
r#"
Command: SHOW
Description: show the value of a run-time parameter
Syntax:
SHOW name
"#
}
Function::CreateTable => {
r#"
Command: CREATE TABLE
Description: define a new table
Syntax:
CREATE [ EXTERNAL ] TABLE table_name ( [
{ column_name data_type }
[, ... ]
] )
"#
}
Function::CreateTableAs => {
r#"
Command: CREATE TABLE AS
Description: define a new table from the results of a query
Syntax:
CREATE TABLE table_name
[ (column_name [, ...] ) ]
AS query
[ WITH [ NO ] DATA ]
"#
}
Function::Insert => {
r#"
Command: INSERT
Description: create new rows in a table
Syntax:
INSERT INTO table_name [ ( column_name [, ...] ) ]
{ VALUES ( { expression } [, ...] ) [, ...] }
"#
}
Function::DropTable => {
r#"
Command: DROP TABLE
Description: remove a table
Syntax:
DROP TABLE [ IF EXISTS ] name [, ...]
"#
}
};
Ok(details)
}
}
impl FromStr for Function {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s.trim().to_uppercase().as_str() {
"SELECT" => Self::Select,
"EXPLAIN" => Self::Explain,
"SHOW" => Self::Show,
"CREATE TABLE" => Self::CreateTable,
"CREATE TABLE AS" => Self::CreateTableAs,
"INSERT" => Self::Insert,
"DROP TABLE" => Self::DropTable,
_ => return Err(()),
})
}
}
impl fmt::Display for Function {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Function::Select => write!(f, "SELECT"),
Function::Explain => write!(f, "EXPLAIN"),
Function::Show => write!(f, "SHOW"),
Function::CreateTable => write!(f, "CREATE TABLE"),
Function::CreateTableAs => write!(f, "CREATE TABLE AS"),
Function::Insert => write!(f, "INSERT"),
Function::DropTable => write!(f, "DROP TABLE"),
}
}
}
pub fn display_all_functions() -> Result<()> {
println!("Available help:");
let array = StringArray::from(
ALL_FUNCTIONS
.iter()
.map(|f| format!("{f}"))
.collect::<Vec<String>>(),
);
let schema = Schema::new(vec![Field::new("Function", DataType::Utf8, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;
println!("{}", pretty_format_batches(&[batch]).unwrap());
Ok(())
}
/// PARQUET_META table function
#[derive(Debug)]
struct ParquetMetadataTable {
schema: SchemaRef,
batch: RecordBatch,
}
#[async_trait]
impl TableProvider for ParquetMetadataTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> arrow::datatypes::SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> datafusion::logical_expr::TableType {
datafusion::logical_expr::TableType::Base
}
async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(MemorySourceConfig::try_new_exec(
&[vec![self.batch.clone()]],
TableProvider::schema(self),
projection.cloned(),
)?)
}
}
fn convert_parquet_statistics(
value: &Statistics,
converted_type: ConvertedType,
) -> (Option<String>, Option<String>) {
match (value, converted_type) {
(Statistics::Boolean(val), _) => (
val.min_opt().map(|v| v.to_string()),
val.max_opt().map(|v| v.to_string()),
),
(Statistics::Int32(val), _) => (
val.min_opt().map(|v| v.to_string()),
val.max_opt().map(|v| v.to_string()),
),
(Statistics::Int64(val), _) => (
val.min_opt().map(|v| v.to_string()),
val.max_opt().map(|v| v.to_string()),
),
(Statistics::Int96(val), _) => (
val.min_opt().map(|v| v.to_string()),
val.max_opt().map(|v| v.to_string()),
),
(Statistics::Float(val), _) => (
val.min_opt().map(|v| v.to_string()),
val.max_opt().map(|v| v.to_string()),
),
(Statistics::Double(val), _) => (
val.min_opt().map(|v| v.to_string()),
val.max_opt().map(|v| v.to_string()),
),
(Statistics::ByteArray(val), ConvertedType::UTF8) => (
byte_array_to_string(val.min_opt()),
byte_array_to_string(val.max_opt()),
),
(Statistics::ByteArray(val), _) => (
val.min_opt().map(|v| v.to_string()),
val.max_opt().map(|v| v.to_string()),
),
(Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => (
fixed_len_byte_array_to_string(val.min_opt()),
fixed_len_byte_array_to_string(val.max_opt()),
),
(Statistics::FixedLenByteArray(val), _) => (
val.min_opt().map(|v| v.to_string()),
val.max_opt().map(|v| v.to_string()),
),
}
}
/// Convert to a string if it has utf8 encoding, otherwise print bytes directly
fn byte_array_to_string(val: Option<&ByteArray>) -> Option<String> {
val.map(|v| {
v.as_utf8()
.map(|s| s.to_string())
.unwrap_or_else(|_e| v.to_string())
})
}
/// Convert to a string if it has utf8 encoding, otherwise print bytes directly
fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<String> {
val.map(|v| {
v.as_utf8()
.map(|s| s.to_string())
.unwrap_or_else(|_e| v.to_string())
})
}
#[derive(Debug)]
pub struct ParquetMetadataFunc {}
impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let filename = match exprs.first() {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
_ => {
return plan_err!(
"parquet_metadata requires string argument as its input"
);
}
};
let file = File::open(filename.clone())?;
let reader = SerializedFileReader::new(file)?;
let metadata = reader.metadata();
let schema = Arc::new(Schema::new(vec![
Field::new("filename", DataType::Utf8, true),
Field::new("row_group_id", DataType::Int64, true),
Field::new("row_group_num_rows", DataType::Int64, true),
Field::new("row_group_num_columns", DataType::Int64, true),
Field::new("row_group_bytes", DataType::Int64, true),
Field::new("column_id", DataType::Int64, true),
Field::new("file_offset", DataType::Int64, true),
Field::new("num_values", DataType::Int64, true),
Field::new("path_in_schema", DataType::Utf8, true),
Field::new("type", DataType::Utf8, true),
Field::new("stats_min", DataType::Utf8, true),
Field::new("stats_max", DataType::Utf8, true),
Field::new("stats_null_count", DataType::Int64, true),
Field::new("stats_distinct_count", DataType::Int64, true),
Field::new("stats_min_value", DataType::Utf8, true),
Field::new("stats_max_value", DataType::Utf8, true),
Field::new("compression", DataType::Utf8, true),
Field::new("encodings", DataType::Utf8, true),
Field::new("index_page_offset", DataType::Int64, true),
Field::new("dictionary_page_offset", DataType::Int64, true),
Field::new("data_page_offset", DataType::Int64, true),
Field::new("total_compressed_size", DataType::Int64, true),
Field::new("total_uncompressed_size", DataType::Int64, true),
]));
// construct record batch from metadata
let mut filename_arr = vec![];
let mut row_group_id_arr = vec![];
let mut row_group_num_rows_arr = vec![];
let mut row_group_num_columns_arr = vec![];
let mut row_group_bytes_arr = vec![];
let mut column_id_arr = vec![];
let mut file_offset_arr = vec![];
let mut num_values_arr = vec![];
let mut path_in_schema_arr = vec![];
let mut type_arr = vec![];
let mut stats_min_arr = vec![];
let mut stats_max_arr = vec![];
let mut stats_null_count_arr = vec![];
let mut stats_distinct_count_arr = vec![];
let mut stats_min_value_arr = vec![];
let mut stats_max_value_arr = vec![];
let mut compression_arr = vec![];
let mut encodings_arr = vec![];
let mut index_page_offset_arr = vec![];
let mut dictionary_page_offset_arr = vec![];
let mut data_page_offset_arr = vec![];
let mut total_compressed_size_arr = vec![];
let mut total_uncompressed_size_arr = vec![];
for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
for (col_idx, column) in row_group.columns().iter().enumerate() {
filename_arr.push(filename.clone());
row_group_id_arr.push(rg_idx as i64);
row_group_num_rows_arr.push(row_group.num_rows());
row_group_num_columns_arr.push(row_group.num_columns() as i64);
row_group_bytes_arr.push(row_group.total_byte_size());
column_id_arr.push(col_idx as i64);
file_offset_arr.push(column.file_offset());
num_values_arr.push(column.num_values());
path_in_schema_arr.push(column.column_path().to_string());
type_arr.push(column.column_type().to_string());
let converted_type = column.column_descr().converted_type();
if let Some(s) = column.statistics() {
let (min_val, max_val) =
convert_parquet_statistics(s, converted_type);
stats_min_arr.push(min_val.clone());
stats_max_arr.push(max_val.clone());
stats_null_count_arr.push(s.null_count_opt().map(|c| c as i64));
stats_distinct_count_arr
.push(s.distinct_count_opt().map(|c| c as i64));
stats_min_value_arr.push(min_val);
stats_max_value_arr.push(max_val);
} else {
stats_min_arr.push(None);
stats_max_arr.push(None);
stats_null_count_arr.push(None);
stats_distinct_count_arr.push(None);
stats_min_value_arr.push(None);
stats_max_value_arr.push(None);
};
compression_arr.push(format!("{:?}", column.compression()));
// need to collect into Vec to format
let encodings: Vec<_> = column.encodings().collect();
encodings_arr.push(format!("{:?}", encodings));
index_page_offset_arr.push(column.index_page_offset());
dictionary_page_offset_arr.push(column.dictionary_page_offset());
data_page_offset_arr.push(column.data_page_offset());
total_compressed_size_arr.push(column.compressed_size());
total_uncompressed_size_arr.push(column.uncompressed_size());
}
}
let rb = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(filename_arr)),
Arc::new(Int64Array::from(row_group_id_arr)),
Arc::new(Int64Array::from(row_group_num_rows_arr)),
Arc::new(Int64Array::from(row_group_num_columns_arr)),
Arc::new(Int64Array::from(row_group_bytes_arr)),
Arc::new(Int64Array::from(column_id_arr)),
Arc::new(Int64Array::from(file_offset_arr)),
Arc::new(Int64Array::from(num_values_arr)),
Arc::new(StringArray::from(path_in_schema_arr)),
Arc::new(StringArray::from(type_arr)),
Arc::new(StringArray::from(stats_min_arr)),
Arc::new(StringArray::from(stats_max_arr)),
Arc::new(Int64Array::from(stats_null_count_arr)),
Arc::new(Int64Array::from(stats_distinct_count_arr)),
Arc::new(StringArray::from(stats_min_value_arr)),
Arc::new(StringArray::from(stats_max_value_arr)),
Arc::new(StringArray::from(compression_arr)),
Arc::new(StringArray::from(encodings_arr)),
Arc::new(Int64Array::from(index_page_offset_arr)),
Arc::new(Int64Array::from(dictionary_page_offset_arr)),
Arc::new(Int64Array::from(data_page_offset_arr)),
Arc::new(Int64Array::from(total_compressed_size_arr)),
Arc::new(Int64Array::from(total_uncompressed_size_arr)),
],
)?;
let parquet_metadata = ParquetMetadataTable { schema, batch: rb };
Ok(Arc::new(parquet_metadata))
}
}
/// METADATA_CACHE table function
#[derive(Debug)]
struct MetadataCacheTable {
schema: SchemaRef,
batch: RecordBatch,
}
#[async_trait]
impl TableProvider for MetadataCacheTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> arrow::datatypes::SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> datafusion::logical_expr::TableType {
datafusion::logical_expr::TableType::Base
}
async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(MemorySourceConfig::try_new_exec(
&[vec![self.batch.clone()]],
TableProvider::schema(self),
projection.cloned(),
)?)
}
}
#[derive(Debug)]
pub struct MetadataCacheFunc {
cache_manager: Arc<CacheManager>,
}
impl MetadataCacheFunc {
pub fn new(cache_manager: Arc<CacheManager>) -> Self {
Self { cache_manager }
}
}
impl TableFunctionImpl for MetadataCacheFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
if !exprs.is_empty() {
return plan_err!("metadata_cache should have no arguments");
}
let schema = Arc::new(Schema::new(vec![
Field::new("path", DataType::Utf8, false),
Field::new(
"file_modified",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("file_size_bytes", DataType::UInt64, false),
Field::new("e_tag", DataType::Utf8, true),
Field::new("version", DataType::Utf8, true),
Field::new("metadata_size_bytes", DataType::UInt64, false),
Field::new("hits", DataType::UInt64, false),
Field::new("extra", DataType::Utf8, true),
]));
// construct record batch from metadata
let mut path_arr = vec![];
let mut file_modified_arr = vec![];
let mut file_size_bytes_arr = vec![];
let mut e_tag_arr = vec![];
let mut version_arr = vec![];
let mut metadata_size_bytes = vec![];
let mut hits_arr = vec![];
let mut extra_arr = vec![];
let cached_entries = self.cache_manager.get_file_metadata_cache().list_entries();
for (path, entry) in cached_entries {
path_arr.push(path.to_string());
file_modified_arr
.push(Some(entry.object_meta.last_modified.timestamp_millis()));
file_size_bytes_arr.push(entry.object_meta.size);
e_tag_arr.push(entry.object_meta.e_tag);
version_arr.push(entry.object_meta.version);
metadata_size_bytes.push(entry.size_bytes as u64);
hits_arr.push(entry.hits as u64);
let mut extra = entry
.extra
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>();
extra.sort();
extra_arr.push(extra.join(" "));
}
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(path_arr)),
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
Arc::new(UInt64Array::from(file_size_bytes_arr)),
Arc::new(StringArray::from(e_tag_arr)),
Arc::new(StringArray::from(version_arr)),
Arc::new(UInt64Array::from(metadata_size_bytes)),
Arc::new(UInt64Array::from(hits_arr)),
Arc::new(StringArray::from(extra_arr)),
],
)?;
let metadata_cache = MetadataCacheTable { schema, batch };
Ok(Arc::new(metadata_cache))
}
}