blob: bc54a996eddf1d3eba237a90664d13c587e9743e [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<!--\n",
" Licensed to the Apache Software Foundation (ASF) under one\n",
" or more contributor license agreements. See the NOTICE file\n",
" distributed with this work for additional information\n",
" regarding copyright ownership. The ASF licenses this file\n",
" to you under the Apache License, Version 2.0 (the\n",
" \"License\"); you may not use this file except in compliance\n",
" with the License. You may obtain a copy of the License at\n",
"\n",
" http://www.apache.org/licenses/LICENSE-2.0\n",
"\n",
" Unless required by applicable law or agreed to in writing,\n",
" software distributed under the License is distributed on an\n",
" \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
" KIND, either express or implied. See the License for the\n",
" specific language governing permissions and limitations\n",
" under the License.\n",
"-->\n",
"\n",
"# Interactive Beam Examples"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.runners.interactive import interactive_runner"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"p = beam.Pipeline(interactive_runner.InteractiveRunner())\n",
"init_pcoll = p | beam.Create(range(10))\n",
"squares = init_pcoll | 'Square' >> beam.Map(lambda x: x*x)\n",
"cubes = init_pcoll | 'Cube' >> beam.Map(lambda x: x**3)\n",
"result = p.run()\n",
"result.wait_until_finish()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"init_list = list(range(10))\n",
"squares_list = list(result.get(squares))\n",
"cubes_list = list(result.get(cubes))\n",
"\n",
"squares_list.sort()\n",
"cubes_list.sort()\n",
"\n",
"!pip install matplotlib\n",
"\n",
"%matplotlib inline\n",
"from matplotlib import pyplot as plt\n",
"plt.scatter(init_list, squares_list, label='squares', color='red')\n",
"plt.scatter(init_list, cubes_list, label='cubes', color='blue')\n",
"plt.legend(loc='upper left')\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class AverageFn(beam.CombineFn):\n",
" def create_accumulator(self):\n",
" return (0.0, 0)\n",
"\n",
" def add_input(self, sum_count, input):\n",
" (sum, count) = sum_count\n",
" return sum + input, count + 1\n",
"\n",
" def merge_accumulators(self, accumulators):\n",
" sums, counts = zip(*accumulators)\n",
" return sum(sums), sum(counts)\n",
"\n",
" def extract_output(self, sum_count):\n",
" (sum, count) = sum_count\n",
" return sum / count if count else float('NaN')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"average_square = squares | 'Average Square' >> beam.CombineGlobally(AverageFn())\n",
"average_cube = cubes | 'Average Cube' >> beam.CombineGlobally(AverageFn())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"result = p.run()"
]
}
],
"metadata": {
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.5rc1"
}
},
"nbformat": 4,
"nbformat_minor": 4
}