blob: 79a9c44a2d5b225c2ac966a14d98a9c3a2f8dedd [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
from __future__ import print_function
def withtimestamps_event_time(test=None):
# [START withtimestamps_event_time]
import apache_beam as beam
class GetTimestamp(beam.DoFn):
def process(self, plant, timestamp=beam.DoFn.TimestampParam):
yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name'])
with beam.Pipeline() as pipeline:
plant_timestamps = (
pipeline
| 'Garden plants' >> beam.Create([
{'name': 'Strawberry', 'season': 1585699200}, # April, 2020
{'name': 'Carrot', 'season': 1590969600}, # June, 2020
{'name': 'Artichoke', 'season': 1583020800}, # March, 2020
{'name': 'Tomato', 'season': 1588291200}, # May, 2020
{'name': 'Potato', 'season': 1598918400}, # September, 2020
])
| 'With timestamps' >> beam.Map(
lambda plant: beam.window.TimestampedValue(plant, plant['season']))
| 'Get timestamp' >> beam.ParDo(GetTimestamp())
| beam.Map(print)
)
# [END withtimestamps_event_time]
if test:
test(plant_timestamps)
def withtimestamps_logical_clock(test=None):
# [START withtimestamps_logical_clock]
import apache_beam as beam
class GetTimestamp(beam.DoFn):
def process(self, plant, timestamp=beam.DoFn.TimestampParam):
event_id = int(timestamp.micros / 1e6) # equivalent to seconds
yield '{} - {}'.format(event_id, plant['name'])
with beam.Pipeline() as pipeline:
plant_events = (
pipeline
| 'Garden plants' >> beam.Create([
{'name': 'Strawberry', 'event_id': 1},
{'name': 'Carrot', 'event_id': 4},
{'name': 'Artichoke', 'event_id': 2},
{'name': 'Tomato', 'event_id': 3},
{'name': 'Potato', 'event_id': 5},
])
| 'With timestamps' >> beam.Map(lambda plant: \
beam.window.TimestampedValue(plant, plant['event_id']))
| 'Get timestamp' >> beam.ParDo(GetTimestamp())
| beam.Map(print)
)
# [END withtimestamps_logical_clock]
if test:
test(plant_events)
def withtimestamps_processing_time(test=None):
# [START withtimestamps_processing_time]
import apache_beam as beam
import time
class GetTimestamp(beam.DoFn):
def process(self, plant, timestamp=beam.DoFn.TimestampParam):
yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name'])
with beam.Pipeline() as pipeline:
plant_processing_times = (
pipeline
| 'Garden plants' >> beam.Create([
{'name': 'Strawberry'},
{'name': 'Carrot'},
{'name': 'Artichoke'},
{'name': 'Tomato'},
{'name': 'Potato'},
])
| 'With timestamps' >> beam.Map(lambda plant: \
beam.window.TimestampedValue(plant, time.time()))
| 'Get timestamp' >> beam.ParDo(GetTimestamp())
| beam.Map(print)
)
# [END withtimestamps_processing_time]
if test:
test(plant_processing_times)
def time_tuple2unix_time():
# [START time_tuple2unix_time]
import time
time_tuple = time.strptime('2020-03-19 20:50:00', '%Y-%m-%d %H:%M:%S')
unix_time = time.mktime(time_tuple)
# [END time_tuple2unix_time]
return unix_time
def datetime2unix_time():
# [START datetime2unix_time]
import time
import datetime
now = datetime.datetime.now()
time_tuple = now.timetuple()
unix_time = time.mktime(time_tuple)
# [END datetime2unix_time]
return unix_time