blob: 41a8170622b8070a12e87bf282bd82033d0d0fed [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
To set up the PyFlink with Apache Sedona, please follow the guide. [PyFlink](../../setup/flink/install-python.md)
When you finish it, you can run the following code to test if everything works.
```python
from sedona.flink import SedonaContext
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
stream_env = StreamExecutionEnvironment.get_execution_environment()
flink_settings = EnvironmentSettings.in_streaming_mode()
table_env = SedonaContext.create(stream_env, flink_settings)
table_env.sql_query("SELECT ST_Point(1.0, 2.0)").execute()
```
PyFlink does not expose the possibility of transforming Scala's own user-defined types (UDT) to Python UDT.
So, when you want to collect the result in Python, you need to use functions
like `ST_AsText` or `ST_ASBinary` to convert the result to a string or binary.
```python
from shapely.wkb import loads
table_env.sql_query("SELECT ST_ASBinary(ST_Point(1.0, 2.0))").execute().collect()
[loads(bytes(el[0])) for el in result]
```
```
[<POINT (1 2)>]
```
Similar with User Defined Scalar functions
```python
from pyflink.table.udf import ScalarFunction, udf
from shapely.wkb import loads
class Buffer(ScalarFunction):
def eval(self, s):
geom = loads(s)
return geom.buffer(1).wkb
table_env.create_temporary_function(
"ST_BufferPython", udf(Buffer(), result_type="Binary")
)
buffer_table = table_env.sql_query(
"SELECT ST_BufferPython(ST_ASBinary(ST_Point(1.0, 2.0))) AS buffer"
)
```
For more SQL examples please follow the FlinkSQL section [FlinkSQL](sql.md).