blob: d736cf50fcee5ce0a59543d117ef5199b040cc9f [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Second in a series of four pipelines that tell a story in a 'gaming' domain.
In addition to the concepts introduced in `user_score`, new concepts include:
windowing and element timestamps; use of `Filter`; using standalone DoFns.
This pipeline processes data collected from gaming events in batch, building on
`user_score` but using fixed windows. It calculates the sum of scores per team,
for each window, optionally allowing specification of two timestamps before and
after which data is filtered out. This allows a model where late data collected
after the intended analysis window can be included, and any late-arriving data
prior to the beginning of the analysis window can be removed as well. By using
windowing and adding element timestamps, we can do finer-grained analysis than
with the `user_score` pipeline. However, our batch processing is high-latency,
in that we don't get results from plays at the beginning of the batch's time
period until the batch is processed.
Optionally include the `--input` argument to specify a batch input file. To
indicate a time after which the data should be filtered out, include the
`--stop_min` arg. E.g., `--stop_min=2015-10-18-23-59` indicates that any data
timestamped after 23:59 PST on 2015-10-18 should not be included in the
analysis. To indicate a time before which data should be filtered out, include
the `--start_min` arg. If you're using the default input
"gs://dataflow-samples/game/gaming_data*.csv", then
`--start_min=2015-11-16-16-10 --stop_min=2015-11-17-16-10` are good values.
For a description of the usage and options, use -h or --help.
To specify a different runner:
--runner YOUR_RUNNER
NOTE: When specifying a different runner, additional runner-specific options
may have to be passed in as well
EXAMPLES
--------
# DirectRunner
python hourly_team_score.py \
--project $PROJECT_ID \
--dataset $BIGQUERY_DATASET
# DataflowRunner
python hourly_team_score.py \
--project $PROJECT_ID \
--dataset $BIGQUERY_DATASET \
--runner DataflowRunner \
--temp_location gs://$BUCKET/user_score/temp
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import csv
import logging
import sys
import time
from datetime import datetime
import apache_beam as beam
from apache_beam.metrics.metric import Metrics
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def str2timestamp(s, fmt='%Y-%m-%d-%H-%M'):
"""Converts a string into a unix timestamp."""
dt = datetime.strptime(s, fmt)
epoch = datetime.utcfromtimestamp(0)
return (dt - epoch).total_seconds()
def timestamp2str(t, fmt='%Y-%m-%d %H:%M:%S.000'):
"""Converts a unix timestamp into a formatted string."""
return datetime.fromtimestamp(t).strftime(fmt)
class ParseGameEventFn(beam.DoFn):
"""Parses the raw game event info into a Python dictionary.
Each event line has the following format:
username,teamname,score,timestamp_in_ms,readable_time
e.g.:
user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
The human-readable time string is not used here.
"""
def __init__(self):
super(ParseGameEventFn, self).__init__()
self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')
def process(self, elem):
try:
row = list(csv.reader([elem]))[0]
yield {
'user': row[0],
'team': row[1],
'score': int(row[2]),
'timestamp': int(row[3]) / 1000.0,
}
except: # pylint: disable=bare-except
# Log and count parse errors
self.num_parse_errors.inc()
logging.error('Parse error on "%s"', elem)
class ExtractAndSumScore(beam.PTransform):
"""A transform to extract key/score information and sum the scores.
The constructor argument `field` determines whether 'team' or 'user' info is
extracted.
"""
def __init__(self, field):
super(ExtractAndSumScore, self).__init__()
self.field = field
def expand(self, pcoll):
return (pcoll
| beam.Map(lambda elem: (elem[self.field], elem['score']))
| beam.CombinePerKey(sum))
class TeamScoresDict(beam.DoFn):
"""Formats the data into a dictionary of BigQuery columns with their values
Receives a (team, score) pair, extracts the window start timestamp, and
formats everything together into a dictionary. The dictionary is in the format
{'bigquery_column': value}
"""
def process(self, team_score, window=beam.DoFn.WindowParam):
team, score = team_score
start = timestamp2str(int(window.start))
yield {
'team': team,
'total_score': score,
'window_start': start,
'processing_time': timestamp2str(int(time.time()))
}
class WriteToBigQuery(beam.PTransform):
"""Generate, format, and write BigQuery table row information."""
def __init__(self, table_name, dataset, schema):
"""Initializes the transform.
Args:
table_name: Name of the BigQuery table to use.
dataset: Name of the dataset to use.
schema: Dictionary in the format {'column_name': 'bigquery_type'}
"""
super(WriteToBigQuery, self).__init__()
self.table_name = table_name
self.dataset = dataset
self.schema = schema
def get_schema(self):
"""Build the output table schema."""
return ', '.join(
'%s:%s' % (col, self.schema[col]) for col in self.schema)
def expand(self, pcoll):
project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
return (
pcoll
| 'ConvertToRow' >> beam.Map(
lambda elem: {col: elem[col] for col in self.schema})
| beam.io.WriteToBigQuery(
self.table_name, self.dataset, project, self.get_schema()))
# [START main]
class HourlyTeamScore(beam.PTransform):
def __init__(self, start_min, stop_min, window_duration):
super(HourlyTeamScore, self).__init__()
self.start_timestamp = str2timestamp(start_min)
self.stop_timestamp = str2timestamp(stop_min)
self.window_duration_in_seconds = window_duration * 60
def expand(self, pcoll):
return (
pcoll
| 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())
# Filter out data before and after the given times so that it is not
# included in the calculations. As we collect data in batches (say, by
# day), the batch for the day that we want to analyze could potentially
# include some late-arriving data from the previous day. If so, we want
# to weed it out. Similarly, if we include data from the following day
# (to scoop up late-arriving events from the day we're analyzing), we
# need to weed out events that fall after the time period we want to
# analyze.
# [START filter_by_time_range]
| 'FilterStartTime' >> beam.Filter(
lambda elem: elem['timestamp'] > self.start_timestamp)
| 'FilterEndTime' >> beam.Filter(
lambda elem: elem['timestamp'] < self.stop_timestamp)
# [END filter_by_time_range]
# [START add_timestamp_and_window]
# Add an element timestamp based on the event log, and apply fixed
# windowing.
| 'AddEventTimestamps' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
| 'FixedWindowsTeam' >> beam.WindowInto(
beam.window.FixedWindows(self.window_duration_in_seconds))
# [END add_timestamp_and_window]
# Extract and sum teamname/score pairs from the event data.
| 'ExtractAndSumScore' >> ExtractAndSumScore('team'))
def run(argv=None):
"""Main entry point; defines and runs the hourly_team_score pipeline."""
parser = argparse.ArgumentParser()
# The default maps to two large Google Cloud Storage files (each ~12GB)
# holding two subsequent day's worth (roughly) of data.
parser.add_argument('--input',
type=str,
default='gs://apache-beam-samples/game/gaming_data*.csv',
help='Path to the data file(s) containing game data.')
parser.add_argument('--dataset',
type=str,
required=True,
help='BigQuery Dataset to write tables to. '
'Must already exist.')
parser.add_argument('--table_name',
default='leader_board',
help='The BigQuery table name. Should not already exist.')
parser.add_argument('--window_duration',
type=int,
default=60,
help='Numeric value of fixed window duration, in minutes')
parser.add_argument('--start_min',
type=str,
default='1970-01-01-00-00',
help='String representation of the first minute after '
'which to generate results in the format: '
'yyyy-MM-dd-HH-mm. Any input data timestamped '
'prior to that minute won\'t be included in the '
'sums.')
parser.add_argument('--stop_min',
type=str,
default='2100-01-01-00-00',
help='String representation of the first minute for '
'which to generate results in the format: '
'yyyy-MM-dd-HH-mm. Any input data timestamped '
'after to that minute won\'t be included in the '
'sums.')
args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
# We also require the --project option to access --dataset
if options.view_as(GoogleCloudOptions).project is None:
parser.print_usage()
print(sys.argv[0] + ': error: argument --project is required')
sys.exit(1)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=options) as p:
(p # pylint: disable=expression-not-assigned
| 'ReadInputText' >> beam.io.ReadFromText(args.input)
| 'HourlyTeamScore' >> HourlyTeamScore(
args.start_min, args.stop_min, args.window_duration)
| 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
| 'WriteTeamScoreSums' >> WriteToBigQuery(
args.table_name, args.dataset, {
'team': 'STRING',
'total_score': 'INTEGER',
'window_start': 'STRING',
}))
# [END main]
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()