blob: 5afec24d752e8f3c7dcb9615d9c2a1d51a20e576 [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<!--\n",
" Licensed to the Apache Software Foundation (ASF) under one\n",
" or more contributor license agreements. See the NOTICE file\n",
" distributed with this work for additional information\n",
" regarding copyright ownership. The ASF licenses this file\n",
" to you under the Apache License, Version 2.0 (the\n",
" \"License\"); you may not use this file except in compliance\n",
" with the License. You may obtain a copy of the License at\n",
"\n",
" http://www.apache.org/licenses/LICENSE-2.0\n",
"\n",
" Unless required by applicable law or agreed to in writing,\n",
" software distributed under the License is distributed on an\n",
" \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
" KIND, either express or implied. See the License for the\n",
" specific language governing permissions and limitations\n",
" under the License.\n",
"-->\n",
"\n",
"# Interactive Beam Running on Flink"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.runners.interactive import interactive_runner\n",
"from apache_beam.runners.portability import flink_runner\n",
"\n",
"p = beam.Pipeline(interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"init_pcoll = p | beam.Create(range(10))\n",
"squares = init_pcoll | 'Square' >> beam.Map(lambda x: x*x)\n",
"cubes = init_pcoll | 'Cube' >> beam.Map(lambda x: x**3)\n",
"result = p.run()\n",
"result.wait_until_finish()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"result.get(squares)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class AverageFn(beam.CombineFn):\n",
" def create_accumulator(self):\n",
" return (0.0, 0)\n",
"\n",
" def add_input(self, sum_count, input):\n",
" (sum, count) = sum_count\n",
" return sum + input, count + 1\n",
"\n",
" def merge_accumulators(self, accumulators):\n",
" sums, counts = zip(*accumulators)\n",
" return sum(sums), sum(counts)\n",
"\n",
" def extract_output(self, sum_count):\n",
" (sum, count) = sum_count\n",
" return sum / count if count else float('NaN')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"average_square = squares | 'Average Square' >> beam.CombineGlobally(AverageFn())\n",
"average_cube = cubes | 'Average Cube' >> beam.CombineGlobally(AverageFn())\n",
"result = p.run()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"result.get(average_square)"
]
}
],
"metadata": {
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.5rc1"
}
},
"nbformat": 4,
"nbformat_minor": 4
}