Merge pull request #11703 from ibzib/BEAM-9001

[BEAM-9001, BEAM-6327] Ensure that all transforms (except for require…
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 052892b..0f34722 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -191,14 +191,14 @@
             self.windowed_coder)
     ]
     self.splitting_lock = threading.Lock()
+    self.index = -1
+    self.stop = float('inf')
     self.started = False
 
   def start(self):
     # type: () -> None
     super(DataInputOperation, self).start()
     with self.splitting_lock:
-      self.index = -1
-      self.stop = float('inf')
       self.started = True
 
   def process(self, windowed_value):
@@ -317,8 +317,15 @@
   def finish(self):
     # type: () -> None
     with self.splitting_lock:
+      self.index += 1
       self.started = False
 
+  def reset(self):
+    # type: () -> None
+    self.index = -1
+    self.stop = float('inf')
+    super(DataInputOperation, self).reset()
+
 
 class _StateBackedIterable(object):
   def __init__(self,