blob: fdd209a8efd96ef46bf6af5a655972fe04bee396 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A streaming workflow that uses a synthetic streaming source.
This can only be used with the Flink portable runner.
"""
# pytype: skip-file
import argparse
import logging
import sys
import apache_beam as beam
from apache_beam.io.flink.flink_streaming_impulse_source import FlinkStreamingImpulseSource
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import window
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.transforms.trigger import AfterProcessingTime
from apache_beam.transforms.trigger import Repeatedly
def split(s):
a = s.split("-")
return a[0], int(a[1])
def count(x):
return x[0], sum(x[1])
def apply_timestamp(element):
import time
yield window.TimestampedValue(element, time.time())
def run(argv=None):
"""Build and run the pipeline."""
args = [
"--runner=PortableRunner", "--job_endpoint=localhost:8099", "--streaming"
]
if argv:
args.extend(argv)
parser = argparse.ArgumentParser()
parser.add_argument(
'--count',
dest='count',
default=0,
help='Number of triggers to generate '
'(0 means emit forever).')
parser.add_argument(
'--interval_ms',
dest='interval_ms',
default=500,
help='Interval between records per parallel '
'Flink subtask.')
known_args, pipeline_args = parser.parse_known_args(args)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
messages = (
p | FlinkStreamingImpulseSource().set_message_count(
known_args.count).set_interval_ms(known_args.interval_ms))
_ = (
messages | 'decode' >> beam.Map(lambda x: ('', 1))
| 'window' >> beam.WindowInto(
window.GlobalWindows(),
trigger=Repeatedly(AfterProcessingTime(5 * 1000)),
accumulation_mode=AccumulationMode.DISCARDING)
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count)
| 'log' >> beam.Map(lambda x: logging.info("%d" % x[1])))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run(sys.argv[1:])