blob: cab67601770ae264db27f350453a599a17099324 [file]
import logging
import apache_beam as beam
from apache_beam.transforms import DoFn
from framework import prism_options, dataflow_options
from framework import periodic_source
from framework import dump_to_log
from timestamp_buffer import TimestampBufferDoFnBag
logging.basicConfig(level=logging.INFO)
WINDOW_SIZE = 6
SLIDE_INTERVAL = 6
class MyBufferDoFn(TimestampBufferDoFnBag):
def process_element(self, key, element_ts, value, context,
**extra_state):
yield [v[1] for v in context] + [value]
with beam.Pipeline(options=prism_options) as p:
_ = (
p
| periodic_source
| beam.Map(lambda x, t=DoFn.TimestampParam: (x[0], (t, x[1])))
| beam.ParDo(MyBufferDoFn(40))
| dump_to_log)