blob: 67afa30f34e89c18e9fedd6b93cc71cf0c7518e0 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pyarrow as pa
class SQLContext:
"""SQL query context for Paimon tables.
Uses pypaimon-rust and DataFusion to execute SQL queries against Paimon tables.
Supports registering multiple catalogs for cross-catalog queries.
Example::
from pypaimon.sql import SQLContext
ctx = SQLContext()
ctx.register_catalog("paimon", {"warehouse": "/path/to/warehouse"})
ctx.set_current_catalog("paimon")
ctx.set_current_database("default")
result = ctx.sql("SELECT * FROM my_table")
"""
def __init__(self):
try:
from datafusion import SessionContext
except ImportError:
raise ImportError(
"datafusion is required for SQL query support. "
"Install it with: pip install pypaimon[sql]"
)
self._ctx = SessionContext()
def register_catalog(self, name, options):
"""Register a Paimon catalog as a DataFusion catalog provider.
Args:
name: The catalog name to register under.
options: A dict of catalog options (e.g. {"warehouse": "/path/to/warehouse"}).
"""
try:
from pypaimon_rust.datafusion import PaimonCatalog
except ImportError:
raise ImportError(
"pypaimon-rust is required for SQL query support. "
"Install it with: pip install pypaimon[sql]"
)
paimon_catalog = PaimonCatalog(options)
self._ctx.register_catalog_provider(name, paimon_catalog)
def set_current_catalog(self, catalog_name: str):
"""Set the default catalog for SQL queries."""
self._ctx.sql(f"SET datafusion.catalog.default_catalog = '{catalog_name}'")
def set_current_database(self, database: str):
"""Set the default database for SQL queries."""
self._ctx.sql(f"SET datafusion.catalog.default_schema = '{database}'")
def sql(self, query: str) -> pa.Table:
"""Execute a SQL query and return the result as a PyArrow Table."""
df = self._ctx.sql(query)
batches = df.collect()
if not batches:
return pa.Table.from_batches([], schema=df.schema())
return pa.Table.from_batches(batches)