blob: 53fa7e202a6a54222af4100c9d0662bd28dbc5f9 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
from __future__ import print_function
import unittest
import mock
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from . import withtimestamps
def check_plant_timestamps(actual):
# [START plant_timestamps]
plant_timestamps = [
'2020-04-01 00:00:00 - Strawberry',
'2020-06-01 00:00:00 - Carrot',
'2020-03-01 00:00:00 - Artichoke',
'2020-05-01 00:00:00 - Tomato',
'2020-09-01 00:00:00 - Potato',
]
# [END plant_timestamps]
assert_that(actual, equal_to(plant_timestamps))
def check_plant_events(actual):
# [START plant_events]
plant_events = [
'1 - Strawberry',
'4 - Carrot',
'2 - Artichoke',
'3 - Tomato',
'5 - Potato',
]
# [END plant_events]
assert_that(actual, equal_to(plant_events))
def check_plant_processing_times(actual):
import apache_beam as beam
# [START plant_processing_times]
plant_processing_times = [
'2020-03-20 20:12:42.145594 - Strawberry',
'2020-03-20 20:12:42.145827 - Carrot',
'2020-03-20 20:12:42.145962 - Artichoke',
'2020-03-20 20:12:42.146093 - Tomato',
'2020-03-20 20:12:42.146216 - Potato',
]
# [END plant_processing_times]
# Since `time.time()` will always give something different, we'll
# simply strip the timestamp information before testing the results.
actual = actual | beam.Map(lambda row: row.split('-')[-1].strip())
expected = [row.split('-')[-1].strip() for row in plant_processing_times]
assert_that(actual, equal_to(expected))
@mock.patch('apache_beam.Pipeline', TestPipeline)
# pylint: disable=line-too-long
@mock.patch('apache_beam.examples.snippets.transforms.elementwise.withtimestamps.print', lambda elem: elem)
# pylint: enable=line-too-long
class WithTimestampsTest(unittest.TestCase):
def test_event_time(self):
withtimestamps.withtimestamps_event_time(check_plant_timestamps)
def test_logical_clock(self):
withtimestamps.withtimestamps_logical_clock(check_plant_events)
def test_processing_time(self):
withtimestamps.withtimestamps_processing_time(check_plant_processing_times)
def test_time_tuple2unix_time(self):
unix_time = withtimestamps.time_tuple2unix_time()
self.assertIsInstance(unix_time, float)
def test_datetime2unix_time(self):
unix_time = withtimestamps.datetime2unix_time()
self.assertIsInstance(unix_time, float)
if __name__ == '__main__':
unittest.main()