blob: 74575554ec0aff2e7f8248ab5250dcd9a8176eab [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
/// This example shows how to implement the DataFusion [`CatalogProvider`] API
/// for catalogs that are remote (require network access) and/or offer only
/// asynchronous APIs such as [Polaris], [Unity], and [Hive].
///
/// Integrating with this catalogs is a bit more complex than with local
/// catalogs because calls like `ctx.sql("SELECT * FROM db.schm.tbl")` may need
/// to perform remote network requests, but many Catalog APIs are synchronous.
/// See the documentation on [`CatalogProvider`] for more details.
///
/// [`CatalogProvider`]: datafusion_catalog::CatalogProvider
///
/// [Polaris]: https://github.com/apache/polaris
/// [Unity]: https://github.com/unitycatalog/unitycatalog
/// [Hive]: https://hive.apache.org/
use arrow::array::record_batch;
use arrow::datatypes::{Field, Fields, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::catalog::TableProvider;
use datafusion::catalog::{AsyncSchemaProvider, Session};
use datafusion::common::Result;
use datafusion::common::{assert_batches_eq, internal_datafusion_err, plan_err};
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::logical_expr::{Expr, TableType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{DataFrame, SessionContext};
use futures::TryStreamExt;
use std::any::Any;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<()> {
// As always, we create a session context to interact with DataFusion
let ctx = SessionContext::new();
// Make a connection to the remote catalog, asynchronously, and configure it
let remote_catalog_interface = Arc::new(RemoteCatalogInterface::connect().await?);
// Create an adapter to provide the AsyncSchemaProvider interface to DataFusion
// based on our remote catalog interface
let remote_catalog_adapter = RemoteCatalogDatafusionAdapter(remote_catalog_interface);
// Here is a query that selects data from a table in the remote catalog.
let sql = "SELECT * from remote_schema.remote_table";
// The `SessionContext::sql` interface is async, but it does not
// support asynchronous access to catalogs, so we cannot register our schema provider
// directly and the following query fails to find our table.
let results = ctx.sql(sql).await;
assert_eq!(
results.unwrap_err().to_string(),
"Error during planning: table 'datafusion.remote_schema.remote_table' not found"
);
// Instead, to use a remote catalog, we must use lower level APIs on
// SessionState (what `SessionContext::sql` does internally).
let state = ctx.state();
// First, parse the SQL (but don't plan it / resolve any table references)
let dialect = state.config().options().sql_parser.dialect;
let statement = state.sql_to_statement(sql, &dialect)?;
// Find all `TableReferences` in the parsed queries. These correspond to the
// tables referred to by the query (in this case
// `remote_schema.remote_table`)
let references = state.resolve_table_references(&statement)?;
// Now we can asynchronously resolve the table references to get a cached catalog
// that we can use for our query
let resolved_catalog = remote_catalog_adapter
.resolve(&references, state.config(), "datafusion", "remote_schema")
.await?;
// This resolved catalog only makes sense for this query and so we create a clone
// of the session context with the resolved catalog
let query_ctx = ctx.clone();
query_ctx
.catalog("datafusion")
.ok_or_else(|| internal_datafusion_err!("default catalog was not installed"))?
.register_schema("remote_schema", resolved_catalog)?;
// We can now continue planning the query with this new query-specific context that
// contains our cached catalog
let query_state = query_ctx.state();
let plan = query_state.statement_to_plan(statement).await?;
let results = DataFrame::new(state, plan).collect().await?;
assert_batches_eq!(
[
"+----+-------+",
"| id | name |",
"+----+-------+",
"| 1 | alpha |",
"| 2 | beta |",
"| 3 | gamma |",
"+----+-------+",
],
&results
);
Ok(())
}
/// This is an example of an API that interacts with a remote catalog.
///
/// Specifically, its APIs are all `async` and thus can not be used by
/// [`SchemaProvider`] or [`TableProvider`] directly.
#[derive(Debug)]
struct RemoteCatalogInterface {}
impl RemoteCatalogInterface {
/// Establish a connection to the remote catalog
pub async fn connect() -> Result<Self> {
// In a real implementation this method might connect to a remote
// catalog, validate credentials, cache basic information, etc
Ok(Self {})
}
/// Fetches information for a specific table
pub async fn table_info(&self, name: &str) -> Result<Option<SchemaRef>> {
if name != "remote_table" {
return Ok(None);
}
// In this example, we'll model a remote table with columns "id" and
// "name"
//
// A real remote catalog would make a network call to fetch this
// information from a remote source.
let schema = Schema::new(Fields::from(vec![
Field::new("id", arrow::datatypes::DataType::Int32, false),
Field::new("name", arrow::datatypes::DataType::Utf8, false),
]));
Ok(Some(Arc::new(schema)))
}
/// Fetches data for a table from a remote data source
pub async fn read_data(&self, name: &str) -> Result<SendableRecordBatchStream> {
if name != "remote_table" {
return plan_err!("Remote table not found: {}", name);
}
// In a real remote catalog this call would likely perform network IO to
// open and begin reading from a remote datasource, prefetching
// information, etc.
//
// In this example we are just demonstrating how the API works so simply
// return back some static data as a stream.
let batch = record_batch!(
("id", Int32, [1, 2, 3]),
("name", Utf8, ["alpha", "beta", "gamma"])
)
.unwrap();
let schema = batch.schema();
let stream = futures::stream::iter([Ok(batch)]);
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
}
/// Implements an async version of the DataFusion SchemaProvider API for tables
/// stored in a remote catalog.
struct RemoteCatalogDatafusionAdapter(Arc<RemoteCatalogInterface>);
#[async_trait]
impl AsyncSchemaProvider for RemoteCatalogDatafusionAdapter {
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
// Fetch information about the table from the remote catalog
//
// Note that a real remote catalog interface could return more
// information, but at the minimum, DataFusion requires the
// table's schema for planing.
Ok(self.0.table_info(name).await?.map(|schema| {
Arc::new(RemoteTable::new(Arc::clone(&self.0), name, schema))
as Arc<dyn TableProvider>
}))
}
}
/// Represents the information about a table retrieved from the remote catalog
#[derive(Debug)]
struct RemoteTable {
/// connection to the remote catalog
remote_catalog_interface: Arc<RemoteCatalogInterface>,
name: String,
schema: SchemaRef,
}
impl RemoteTable {
pub fn new(
remote_catalog_interface: Arc<RemoteCatalogInterface>,
name: impl Into<String>,
schema: SchemaRef,
) -> Self {
Self {
remote_catalog_interface,
name: name.into(),
schema,
}
}
}
/// Implement the DataFusion Catalog API for [`RemoteTable`]
#[async_trait]
impl TableProvider for RemoteTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Note that `scan` is called once the plan begin execution, and thus is
// async. When interacting with remote data sources, this is the place
// to begin establishing the remote connections and interacting with the
// remote storage system.
//
// As this example is just modeling the catalog API interface, we buffer
// the results locally in memory for simplicity.
let batches = self
.remote_catalog_interface
.read_data(&self.name)
.await?
.try_collect()
.await?;
let exec = MemorySourceConfig::try_new_exec(
&[batches],
self.schema.clone(),
projection.cloned(),
)?;
Ok(exec)
}
}