blob: f545ccf19306a7a98f5d186f762e75894e5d190d [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#![cfg_attr(test, allow(clippy::needless_pass_by_value))]
#![doc(
html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
extern crate wasm_bindgen;
use datafusion_common::ScalarValue;
use datafusion_expr::lit;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
use datafusion_sql::sqlparser::dialect::GenericDialect;
use datafusion_sql::sqlparser::parser::Parser;
use wasm_bindgen::prelude::*;
pub fn set_panic_hook() {
// When the `console_error_panic_hook` feature is enabled, we can call the
// `set_panic_hook` function at least once during initialization, and then
// we will get better error messages if our code ever panics.
//
// For more details see
// https://github.com/rustwasm/console_error_panic_hook#readme
#[cfg(feature = "console_error_panic_hook")]
console_error_panic_hook::set_once();
}
/// Make console.log available as the log Rust function
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = console)]
fn log(s: &str);
}
#[wasm_bindgen]
pub fn basic_exprs() {
set_panic_hook();
// Create a scalar value (from datafusion-common)
let scalar = ScalarValue::from("Hello, World!");
log(&format!("ScalarValue: {scalar:?}"));
// Create an Expr (from datafusion-expr)
let expr = lit(28) + lit(72);
log(&format!("Expr: {expr:?}"));
// Simplify Expr (using datafusion-phys-expr and datafusion-optimizer)
let simplifier = ExprSimplifier::new(SimplifyContext::default());
let simplified_expr = simplifier.simplify(expr).unwrap();
log(&format!("Simplified Expr: {simplified_expr:?}"));
}
#[wasm_bindgen]
pub fn basic_parse() {
// Parse SQL (using datafusion-sql)
let sql = "SELECT 2 + 37";
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
let ast = Parser::parse_sql(&dialect, sql).unwrap();
log(&format!("Parsed SQL: {ast:?}"));
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use bytes::Bytes;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::{
arrow::{
array::{ArrayRef, Int32Array, RecordBatch, StringArray},
datatypes::{DataType, Field, Schema},
},
datasource::MemTable,
execution::context::SessionContext,
prelude::CsvReadOptions,
};
use datafusion_common::{DataFusionError, test_util::batches_to_string};
use datafusion_execution::{
config::SessionConfig,
disk_manager::{DiskManagerBuilder, DiskManagerMode},
runtime_env::RuntimeEnvBuilder,
};
use datafusion_physical_plan::collect;
use datafusion_sql::parser::DFParser;
use futures::{StreamExt, TryStreamExt, stream};
use object_store::{ObjectStoreExt, PutPayload, memory::InMemory, path::Path};
use url::Url;
use wasm_bindgen_test::wasm_bindgen_test;
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
#[cfg(target_arch = "wasm32")]
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
fn datafusion_test() {
super::basic_exprs();
super::basic_parse();
}
fn get_ctx() -> Arc<SessionContext> {
let rt = RuntimeEnvBuilder::new()
.with_disk_manager_builder(
DiskManagerBuilder::default().with_mode(DiskManagerMode::Disabled),
)
.build_arc()
.unwrap();
let session_config = SessionConfig::new().with_target_partitions(1);
Arc::new(SessionContext::new_with_config_rt(session_config, rt))
}
fn create_test_data() -> (Arc<Schema>, RecordBatch) {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Utf8, false),
]));
let data: Vec<ArrayRef> = vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
];
let batch = RecordBatch::try_new(schema.clone(), data).unwrap();
(schema, batch)
}
#[wasm_bindgen_test(unsupported = tokio::test)]
async fn basic_execute() {
let sql = "SELECT 2 + 2;";
// Execute SQL (using datafusion)
let session_context = get_ctx();
let statement = DFParser::parse_sql(sql).unwrap().pop_back().unwrap();
let logical_plan = session_context
.state()
.statement_to_plan(statement)
.await
.unwrap();
let data_frame = session_context
.execute_logical_plan(logical_plan)
.await
.unwrap();
let physical_plan = data_frame.create_physical_plan().await.unwrap();
let task_ctx = session_context.task_ctx();
let _ = collect(physical_plan, task_ctx).await.unwrap();
}
#[wasm_bindgen_test(unsupported = tokio::test)]
async fn basic_df_function_execute() {
let sql = "SELECT abs(-1.0);";
let statement = DFParser::parse_sql(sql).unwrap().pop_back().unwrap();
let ctx = get_ctx();
let logical_plan = ctx.state().statement_to_plan(statement).await.unwrap();
let data_frame = ctx.execute_logical_plan(logical_plan).await.unwrap();
let physical_plan = data_frame.create_physical_plan().await.unwrap();
let task_ctx = ctx.task_ctx();
let _ = collect(physical_plan, task_ctx).await.unwrap();
}
#[wasm_bindgen_test(unsupported = tokio::test)]
async fn test_basic_aggregate() {
let sql =
"SELECT FIRST_VALUE(value) OVER (ORDER BY id) as first_val FROM test_table;";
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Utf8, false),
]));
let data: Vec<ArrayRef> = vec![
Arc::new(Int32Array::from(vec![1])),
Arc::new(StringArray::from(vec!["a"])),
];
let batch = RecordBatch::try_new(schema.clone(), data).unwrap();
let table = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();
let ctx = get_ctx();
ctx.register_table("test_table", Arc::new(table)).unwrap();
let statement = DFParser::parse_sql(sql).unwrap().pop_back().unwrap();
let logical_plan = ctx.state().statement_to_plan(statement).await.unwrap();
let data_frame = ctx.execute_logical_plan(logical_plan).await.unwrap();
let physical_plan = data_frame.create_physical_plan().await.unwrap();
let task_ctx = ctx.task_ctx();
let _ = collect(physical_plan, task_ctx).await.unwrap();
}
#[wasm_bindgen_test(unsupported = tokio::test)]
async fn test_parquet_write() {
let (schema, batch) = create_test_data();
let mut buffer = Vec::new();
let mut writer = datafusion::parquet::arrow::ArrowWriter::try_new(
&mut buffer,
schema.clone(),
None,
)
.unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
}
#[wasm_bindgen_test(unsupported = tokio::test)]
async fn test_parquet_read_and_write() {
let (schema, batch) = create_test_data();
let mut buffer = Vec::new();
let mut writer = datafusion::parquet::arrow::ArrowWriter::try_new(
&mut buffer,
schema.clone(),
None,
)
.unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let session_ctx = SessionContext::new();
let store = InMemory::new();
let path = Path::from("a.parquet");
store.put(&path, buffer.into()).await.unwrap();
let url = Url::parse("memory://").unwrap();
session_ctx.register_object_store(&url, Arc::new(store));
session_ctx
.register_parquet("a", "memory:///a.parquet", Default::default())
.await
.unwrap();
let df = session_ctx.sql("SELECT * FROM a").await.unwrap();
let result = df.collect().await.unwrap();
assert_eq!(
batches_to_string(&result),
"+----+-------+\n\
| id | value |\n\
+----+-------+\n\
| 1 | a |\n\
| 2 | b |\n\
| 3 | c |\n\
+----+-------+"
);
}
#[wasm_bindgen_test(unsupported = tokio::test)]
async fn test_csv_read_xz_compressed() {
let csv_data = "id,value\n1,a\n2,b\n3,c\n";
let input = Bytes::from(csv_data.as_bytes().to_vec());
let input_stream =
stream::iter(vec![Ok::<Bytes, DataFusionError>(input)]).boxed();
let compressed_stream = FileCompressionType::XZ
.convert_to_compress_stream(input_stream)
.unwrap();
let compressed_data: Vec<Bytes> = compressed_stream.try_collect().await.unwrap();
let store = InMemory::new();
let path = Path::from("data.csv.xz");
store
.put(&path, PutPayload::from_iter(compressed_data))
.await
.unwrap();
let url = Url::parse("memory://").unwrap();
let ctx = SessionContext::new();
ctx.register_object_store(&url, Arc::new(store));
let csv_options = CsvReadOptions::new()
.has_header(true)
.file_compression_type(FileCompressionType::XZ)
.file_extension("csv.xz");
ctx.register_csv("compressed", "memory:///data.csv.xz", csv_options)
.await
.unwrap();
let result = ctx
.sql("SELECT * FROM compressed")
.await
.unwrap()
.collect()
.await
.unwrap();
assert_eq!(
batches_to_string(&result),
"+----+-------+\n\
| id | value |\n\
+----+-------+\n\
| 1 | a |\n\
| 2 | b |\n\
| 3 | c |\n\
+----+-------+"
);
}
}