| { |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "view-in-github" |
| }, |
| "source": [ |
| "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master//Users/dcavazos/src/beam/examples/notebooks/documentation/transforms/python/elementwise/withtimestamps-py.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open in Colab\"/></a>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "view-the-docs-top" |
| }, |
| "source": [ |
| "<table align=\"left\"><td><a target=\"_blank\" href=\"https://beam.apache.org/documentation/transforms/python/elementwise/withtimestamps\"><img src=\"https://beam.apache.org/images/logos/full-color/name-bottom/beam-logo-full-color-name-bottom-100.png\" width=\"32\" height=\"32\" />View the docs</a></td></table>" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "cellView": "form", |
| "id": "_-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "#@title Licensed under the Apache License, Version 2.0 (the \"License\")\n", |
| "# Licensed to the Apache Software Foundation (ASF) under one\n", |
| "# or more contributor license agreements. See the NOTICE file\n", |
| "# distributed with this work for additional information\n", |
| "# regarding copyright ownership. The ASF licenses this file\n", |
| "# to you under the Apache License, Version 2.0 (the\n", |
| "# \"License\"); you may not use this file except in compliance\n", |
| "# with the License. You may obtain a copy of the License at\n", |
| "#\n", |
| "# http://www.apache.org/licenses/LICENSE-2.0\n", |
| "#\n", |
| "# Unless required by applicable law or agreed to in writing,\n", |
| "# software distributed under the License is distributed on an\n", |
| "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", |
| "# KIND, either express or implied. See the License for the\n", |
| "# specific language governing permissions and limitations\n", |
| "# under the License." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "withtimestamps" |
| }, |
| "source": [ |
| "# WithTimestamps\n", |
| "\n", |
| "<script type=\"text/javascript\">\n", |
| "localStorage.setItem('language', 'language-py')\n", |
| "</script>\n", |
| "\n", |
| "Assigns timestamps to all the elements of a collection." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "setup" |
| }, |
| "source": [ |
| "## Setup\n", |
| "\n", |
| "To run a code cell, you can click the **Run cell** button at the top left of the cell,\n", |
| "or select it and press **`Shift+Enter`**.\n", |
| "Try modifying a code cell and re-running it to see what happens.\n", |
| "\n", |
| "> To learn more about Colab, see\n", |
| "> [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb).\n", |
| "\n", |
| "First, let's install the `apache-beam` module." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "setup-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "!pip install --quiet -U apache-beam" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "examples" |
| }, |
| "source": [ |
| "## Examples\n", |
| "\n", |
| "In the following examples, we create a pipeline with a `PCollection` and attach a timestamp value to each of its elements.\n", |
| "When windowing and late data play an important role in streaming pipelines, timestamps are especially useful." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-1-timestamp-by-event-time" |
| }, |
| "source": [ |
| "### Example 1: Timestamp by event time\n", |
| "\n", |
| "The elements themselves often already contain a timestamp field.\n", |
| "`beam.window.TimestampedValue` takes a value and a\n", |
| "[Unix timestamp](https://en.wikipedia.org/wiki/Unix_time)\n", |
| "in the form of seconds." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "example-1-timestamp-by-event-time-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "class GetTimestamp(beam.DoFn):\n", |
| " def process(self, plant, timestamp=beam.DoFn.TimestampParam):\n", |
| " yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name'])\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " plant_timestamps = (\n", |
| " pipeline\n", |
| " | 'Garden plants' >> beam.Create([\n", |
| " {'name': 'Strawberry', 'season': 1585699200}, # April, 2020\n", |
| " {'name': 'Carrot', 'season': 1590969600}, # June, 2020\n", |
| " {'name': 'Artichoke', 'season': 1583020800}, # March, 2020\n", |
| " {'name': 'Tomato', 'season': 1588291200}, # May, 2020\n", |
| " {'name': 'Potato', 'season': 1598918400}, # September, 2020\n", |
| " ])\n", |
| " | 'With timestamps' >> beam.Map(\n", |
| " lambda plant: beam.window.TimestampedValue(plant, plant['season']))\n", |
| " | 'Get timestamp' >> beam.ParDo(GetTimestamp())\n", |
| " | beam.Map(print)\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-1-timestamp-by-event-time-2" |
| }, |
| "source": [ |
| "<table align=\"left\" style=\"margin-right:1em\">\n", |
| " <td>\n", |
| " <a class=\"button\" target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps.py\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n", |
| " </td>\n", |
| "</table>\n", |
| "\n", |
| "<br/><br/><br/>\n", |
| "\n", |
| "To convert from a\n", |
| "[`time.struct_time`](https://docs.python.org/3/library/time.html#time.struct_time)\n", |
| "to `unix_time` you can use\n", |
| "[`time.mktime`](https://docs.python.org/3/library/time.html#time.mktime).\n", |
| "For more information on time formatting options, see\n", |
| "[`time.strftime`](https://docs.python.org/3/library/time.html#time.strftime)." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "example-1-timestamp-by-event-time-code-2" |
| }, |
| "outputs": [], |
| "source": [ |
| "import time\n", |
| "\n", |
| "time_tuple = time.strptime('2020-03-19 20:50:00', '%Y-%m-%d %H:%M:%S')\n", |
| "unix_time = time.mktime(time_tuple)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-1-timestamp-by-event-time-3" |
| }, |
| "source": [ |
| "To convert from a\n", |
| "[`datetime.datetime`](https://docs.python.org/3/library/datetime.html#datetime.datetime)\n", |
| "to `unix_time` you can use convert it to a `time.struct_time` first with\n", |
| "[`datetime.timetuple`](https://docs.python.org/3/library/datetime.html#datetime.datetime.timetuple)." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "example-1-timestamp-by-event-time-code-3" |
| }, |
| "outputs": [], |
| "source": [ |
| "import time\n", |
| "import datetime\n", |
| "\n", |
| "now = datetime.datetime.now()\n", |
| "time_tuple = now.timetuple()\n", |
| "unix_time = time.mktime(time_tuple)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-2-timestamp-by-logical-clock" |
| }, |
| "source": [ |
| "### Example 2: Timestamp by logical clock\n", |
| "\n", |
| "If each element has a chronological number, these numbers can be used as a\n", |
| "[logical clock](https://en.wikipedia.org/wiki/Logical_clock).\n", |
| "These numbers have to be converted to a *\"seconds\"* equivalent, which can be especially important depending on your windowing and late data rules." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "example-2-timestamp-by-logical-clock-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "class GetTimestamp(beam.DoFn):\n", |
| " def process(self, plant, timestamp=beam.DoFn.TimestampParam):\n", |
| " event_id = int(timestamp.micros / 1e6) # equivalent to seconds\n", |
| " yield '{} - {}'.format(event_id, plant['name'])\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " plant_events = (\n", |
| " pipeline\n", |
| " | 'Garden plants' >> beam.Create([\n", |
| " {'name': 'Strawberry', 'event_id': 1},\n", |
| " {'name': 'Carrot', 'event_id': 4},\n", |
| " {'name': 'Artichoke', 'event_id': 2},\n", |
| " {'name': 'Tomato', 'event_id': 3},\n", |
| " {'name': 'Potato', 'event_id': 5},\n", |
| " ])\n", |
| " | 'With timestamps' >> beam.Map(lambda plant: \\\n", |
| " beam.window.TimestampedValue(plant, plant['event_id']))\n", |
| " | 'Get timestamp' >> beam.ParDo(GetTimestamp())\n", |
| " | beam.Map(print)\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-2-timestamp-by-logical-clock-2" |
| }, |
| "source": [ |
| "<table align=\"left\" style=\"margin-right:1em\">\n", |
| " <td>\n", |
| " <a class=\"button\" target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps.py\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n", |
| " </td>\n", |
| "</table>\n", |
| "\n", |
| "<br/><br/><br/>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-3-timestamp-by-processing-time" |
| }, |
| "source": [ |
| "### Example 3: Timestamp by processing time\n", |
| "\n", |
| "If the elements do not have any time data available, you can also use the current processing time for each element.\n", |
| "Note that this grabs the local time of the *worker* that is processing each element.\n", |
| "Workers might have time deltas, so using this method is not a reliable way to do precise ordering.\n", |
| "\n", |
| "By using processing time, there is no way of knowing if data is arriving late because the timestamp is attached when the element *enters* into the pipeline." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "example-3-timestamp-by-processing-time-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "import time\n", |
| "\n", |
| "class GetTimestamp(beam.DoFn):\n", |
| " def process(self, plant, timestamp=beam.DoFn.TimestampParam):\n", |
| " yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name'])\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " plant_processing_times = (\n", |
| " pipeline\n", |
| " | 'Garden plants' >> beam.Create([\n", |
| " {'name': 'Strawberry'},\n", |
| " {'name': 'Carrot'},\n", |
| " {'name': 'Artichoke'},\n", |
| " {'name': 'Tomato'},\n", |
| " {'name': 'Potato'},\n", |
| " ])\n", |
| " | 'With timestamps' >> beam.Map(lambda plant: \\\n", |
| " beam.window.TimestampedValue(plant, time.time()))\n", |
| " | 'Get timestamp' >> beam.ParDo(GetTimestamp())\n", |
| " | beam.Map(print)\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-3-timestamp-by-processing-time-2" |
| }, |
| "source": [ |
| "<table align=\"left\" style=\"margin-right:1em\">\n", |
| " <td>\n", |
| " <a class=\"button\" target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/withtimestamps.py\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n", |
| " </td>\n", |
| "</table>\n", |
| "\n", |
| "<br/><br/><br/>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "related-transforms" |
| }, |
| "source": [ |
| "## Related transforms\n", |
| "\n", |
| "* [Reify](https://beam.apache.org/documentation/transforms/python/elementwise/reify) converts between explicit and implicit forms of Beam values." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "view-the-docs-bottom" |
| }, |
| "source": [ |
| "<table align=\"left\"><td><a target=\"_blank\" href=\"https://beam.apache.org/documentation/transforms/python/elementwise/withtimestamps\"><img src=\"https://beam.apache.org/images/logos/full-color/name-bottom/beam-logo-full-color-name-bottom-100.png\" width=\"32\" height=\"32\" />View the docs</a></td></table>" |
| ] |
| } |
| ], |
| "metadata": { |
| "colab": { |
| "name": "WithTimestamps - element-wise transform", |
| "toc_visible": true |
| }, |
| "kernelspec": { |
| "display_name": "python3", |
| "name": "python3" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 2 |
| } |