blob: 8cc12007de66c1f89d9b98e58aa54374161b55a8 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import py4j
from py4j.protocol import Py4JJavaError
class JavaException(Exception):
def __init__(self, stack_trace: str):
self.stack_trace = stack_trace
def __str__(self):
return self.stack_trace
class TableException(JavaException):
"""
General Exception for all errors during table handling.
"""
class CatalogException(JavaException):
"""
A catalog-related exception.
"""
class DatabaseAlreadyExistException(JavaException):
"""
Exception for trying to create a database that already exists.
"""
class DatabaseNotEmptyException(JavaException):
"""
Exception for trying to drop on a database that is not empty.
"""
class DatabaseNotExistException(JavaException):
"""
Exception for trying to operate on a database that doesn't exist.
"""
class FunctionAlreadyExistException(JavaException):
"""
Exception for trying to create a function that already exists.
"""
class FunctionNotExistException(JavaException):
"""
Exception for trying to operate on a function that doesn't exist.
"""
class PartitionAlreadyExistsException(JavaException):
"""
Exception for trying to create a partition that already exists.
"""
class PartitionNotExistException(JavaException):
"""
Exception for operation on a partition that doesn't exist. The cause includes non-existent
table, non-partitioned table, invalid partition spec, etc.
"""
class PartitionSpecInvalidException(JavaException):
"""
Exception for invalid PartitionSpec compared with partition key list of a partitioned Table.
For example, it is thrown when the size of PartitionSpec exceeds the size of partition key
list, or when the size of PartitionSpec is 'n' but its keys don't match the first 'n' keys in
partition key list.
"""
class TableAlreadyExistException(JavaException):
"""
Exception for trying to create a table (or view) that already exists.
"""
class TableNotExistException(JavaException):
"""
Exception for trying to operate on a table (or view) that doesn't exist.
"""
class TableNotPartitionedException(JavaException):
"""
Exception for trying to operate partition on a non-partitioned table.
"""
# Mapping from JavaException to PythonException
exception_mapping = {
"org.apache.flink.table.api.TableException":
TableException,
"org.apache.flink.table.catalog.exceptions.CatalogException":
CatalogException,
"org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException":
DatabaseAlreadyExistException,
"org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException":
DatabaseNotEmptyException,
"org.apache.flink.table.catalog.exceptions.DatabaseNotExistException":
DatabaseNotExistException,
"org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException":
FunctionAlreadyExistException,
"org.apache.flink.table.catalog.exceptions.FunctionNotExistException":
FunctionNotExistException,
"org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException":
PartitionAlreadyExistsException,
"org.apache.flink.table.catalog.exceptions.PartitionNotExistException":
PartitionNotExistException,
"org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException":
PartitionSpecInvalidException,
"org.apache.flink.table.catalog.exceptions.TableAlreadyExistException":
TableAlreadyExistException,
"org.apache.flink.table.catalog.exceptions.TableNotExistException":
TableNotExistException,
"org.apache.flink.table.catalog.exceptions.TableNotPartitionedException":
TableNotPartitionedException,
}
def capture_java_exception(f):
def deco(*a, **kw):
try:
return f(*a, **kw)
except Py4JJavaError as e:
from pyflink.java_gateway import get_gateway
get_gateway().jvm.org.apache.flink.client.python.PythonEnvUtils\
.setPythonException(e.java_exception)
s = e.java_exception.toString()
for exception in exception_mapping.keys():
if s.startswith(exception):
java_exception = convert_py4j_exception(e)
break
else:
raise
raise java_exception
return deco
def install_exception_handler():
"""
Hook an exception handler into Py4j, which could capture some exceptions in Java.
When calling Java API, it will call `get_return_value` to parse the returned object.
If any exception happened in JVM, the result will be a Java exception object, it raise
py4j.protocol.Py4JJavaError. We replace the original `get_return_value` with one that
could capture the Java exception and throw a Python one (with the same error message).
It's idempotent, could be called multiple times.
"""
original = py4j.protocol.get_return_value
# The original `get_return_value` is not patched, it's idempotent.
patched = capture_java_exception(original)
# only patch the one used in py4j.java_gateway (call Java API)
py4j.java_gateway.get_return_value = patched
def install_py4j_hooks():
"""
Hook the classes such as JavaPackage, etc of Py4j to improve the exception message.
"""
def wrapped_call(self, *args, **kwargs):
raise TypeError(
"Could not found the Java class '%s'. The Java dependencies could be specified via "
"command line argument '--jarfile' or the config option 'pipeline.jars'" % self._fqn)
setattr(py4j.java_gateway.JavaPackage, '__call__', wrapped_call)
def convert_py4j_exception(e: Py4JJavaError) -> JavaException:
"""
Convert Py4J exception to JavaException.
"""
s = e.java_exception.toString()
for exception in exception_mapping.keys():
if s.startswith(exception):
return exception_mapping[exception](str(e).split(': ', 1)[1])
else:
return JavaException(str(e).split(': ', 1)[1])