| .. ################################################################################ |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ################################################################################ |
| |
| =========== |
| Python REPL |
| =========== |
| |
| Flink comes with an integrated interactive Python Shell. |
| It can be used in a local setup as well as in a cluster setup. |
| See the :flinkdoc:`standalone resource provider page <docs/deployment/resource-providers/standalone/overview/>` for more information about how to setup a local Flink. |
| You can also `build a local setup from source <https://github.com/apache/flink#building-apache-flink-from-source>`_. |
| |
| .. note:: |
| The Python Shell will run the command "python". Please refer to the |
| :flinkdoc:`First Steps guide <docs/getting-started/local_installation/>` for PyFlink installation instructions. |
| |
| To use the shell with an integrated Flink cluster, you can simply install PyFlink with PyPi and execute the shell directly: |
| |
| .. code-block:: bash |
| |
| # install PyFlink |
| $ python -m pip install apache-flink |
| # execute the shell |
| $ pyflink-shell.sh local |
| |
| To run the shell on a cluster, please see the Setup section below. |
| |
| Usage |
| ===== |
| |
| The shell only supports Table API currently. |
| The Table Environments are automatically prebound after startup. |
| Use "bt_env" and "st_env" to access BatchTableEnvironment and StreamTableEnvironment respectively. |
| |
| Table API |
| --------- |
| |
| The example below is a simple program in the Python shell: |
| |
| **Streaming:** |
| |
| .. code-block:: python |
| |
| >>> import tempfile |
| >>> import os |
| >>> import shutil |
| >>> sink_path = tempfile.gettempdir() + '/streaming.csv' |
| >>> if os.path.exists(sink_path): |
| ... if os.path.isfile(sink_path): |
| ... os.remove(sink_path) |
| ... else: |
| ... shutil.rmtree(sink_path) |
| >>> s_env.set_parallelism(1) |
| >>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c']) |
| >>> st_env.create_temporary_table("stream_sink", TableDescriptor.for_connector("filesystem") |
| ... .schema(Schema.new_builder() |
| ... .column("a", DataTypes.BIGINT()) |
| ... .column("b", DataTypes.STRING()) |
| ... .column("c", DataTypes.STRING()) |
| ... .build()) |
| ... .option("path", path) |
| ... .format(FormatDescriptor.for_format("csv") |
| ... .option("field-delimiter", ",") |
| ... .build()) |
| ... .build()) |
| >>> t.select("a + 1, b, c")\ |
| ... .execute_insert("stream_sink").wait() |
| >>> # If the job runs in local mode, you can exec following code in Python shell to see the result: |
| >>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f: |
| ... print(f.read()) |
| |
| **Batch:** |
| |
| .. code-block:: python |
| |
| >>> import tempfile |
| >>> import os |
| >>> import shutil |
| >>> sink_path = tempfile.gettempdir() + '/batch.csv' |
| >>> if os.path.exists(sink_path): |
| ... if os.path.isfile(sink_path): |
| ... os.remove(sink_path) |
| ... else: |
| ... shutil.rmtree(sink_path) |
| >>> b_env.set_parallelism(1) |
| >>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c']) |
| >>> st_env.create_temporary_table("batch_sink", TableDescriptor.for_connector("filesystem") |
| ... .schema(Schema.new_builder() |
| ... .column("a", DataTypes.BIGINT()) |
| ... .column("b", DataTypes.STRING()) |
| ... .column("c", DataTypes.STRING()) |
| ... .build()) |
| ... .option("path", path) |
| ... .format(FormatDescriptor.for_format("csv") |
| ... .option("field-delimiter", ",") |
| ... .build()) |
| ... .build()) |
| >>> t.select("a + 1, b, c")\ |
| ... .execute_insert("batch_sink").wait() |
| >>> # If the job runs in local mode, you can exec following code in Python shell to see the result: |
| >>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f: |
| ... print(f.read()) |
| |
| Setup |
| ===== |
| |
| To get an overview of what options the Python Shell provides, please use |
| |
| .. code-block:: bash |
| |
| pyflink-shell.sh --help |
| |
| Local |
| ----- |
| |
| To use the shell with an integrated Flink cluster just execute: |
| |
| .. code-block:: bash |
| |
| pyflink-shell.sh local |
| |
| Remote |
| ------ |
| |
| To use it with a running cluster, please start the Python shell with the keyword ``remote`` |
| and supply the host and port of the JobManager with: |
| |
| .. code-block:: bash |
| |
| pyflink-shell.sh remote <hostname> <portnumber> |
| |
| Yarn Python Shell cluster |
| ------------------------- |
| |
| The shell can deploy a Flink cluster to YARN, which is used exclusively by the |
| shell. |
| The shell deploys a new Flink cluster on YARN and connects the |
| cluster. You can also specify options for YARN cluster such as memory for |
| JobManager, name of YARN application, etc. |
| |
| For example, to start a Yarn cluster for the Python Shell with two TaskManagers |
| use the following: |
| |
| .. code-block:: bash |
| |
| pyflink-shell.sh yarn -n 2 |
| |
| For all other options, see the full reference at the bottom. |
| |
| Yarn Session |
| ------------ |
| |
| If you have previously deployed a Flink cluster using the Flink Yarn Session, |
| the Python shell can connect with it using the following command: |
| |
| .. code-block:: bash |
| |
| pyflink-shell.sh yarn |
| |
| Full Reference |
| ============== |
| |
| .. code-block:: text |
| |
| Flink Python Shell |
| Usage: pyflink-shell.sh [local|remote|yarn] [options] <args>... |
| |
| Command: local [options] |
| Starts Flink Python shell with a local Flink cluster |
| usage: |
| -h,--help Show the help message with descriptions of all options. |
| Command: remote [options] <host> <port> |
| Starts Flink Python shell connecting to a remote cluster |
| <host> |
| Remote host name as string |
| <port> |
| Remote port as integer |
| |
| usage: |
| -h,--help Show the help message with descriptions of all options. |
| Command: yarn [options] |
| Starts Flink Python shell connecting to a yarn cluster |
| usage: |
| -h,--help Show the help message with descriptions of |
| all options. |
| -jm,--jobManagerMemory <arg> Memory for JobManager Container with |
| optional unit (default: MB) |
| -nm,--name <arg> Set a custom name for the application on |
| YARN |
| -qu,--queue <arg> Specify YARN queue. |
| -s,--slots <arg> Number of slots per TaskManager |
| -tm,--taskManagerMemory <arg> Memory per TaskManager Container with |
| optional unit (default: MB) |
| -h | --help |
| Prints this usage text |