Merge pull request #11703 from ibzib/BEAM-9001
[BEAM-9001, BEAM-6327] Ensure that all transforms (except for require…
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 052892b..0f34722 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -191,14 +191,14 @@
self.windowed_coder)
]
self.splitting_lock = threading.Lock()
+ self.index = -1
+ self.stop = float('inf')
self.started = False
def start(self):
# type: () -> None
super(DataInputOperation, self).start()
with self.splitting_lock:
- self.index = -1
- self.stop = float('inf')
self.started = True
def process(self, windowed_value):
@@ -317,8 +317,15 @@
def finish(self):
# type: () -> None
with self.splitting_lock:
+ self.index += 1
self.started = False
+ def reset(self):
+ # type: () -> None
+ self.index = -1
+ self.stop = float('inf')
+ super(DataInputOperation, self).reset()
+
class _StateBackedIterable(object):
def __init__(self,