Port changes from Pub/Sub Lite to beam (#15418)

* Port all changes from the Pub/Sub Lite repo back to beam.

Beam will be the canonical source for this IO in the future per offline discussion.

Also add the other direction of helper to CloudPubsubTransforms and add an integration test.

* remove fixed TODO

* Fixes to ReadWriteIT to work around Create or DirectRunner bug.

* fix racy test
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 4cdf4ae..67fe8b8 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -446,7 +446,7 @@
     def errorprone_version = "2.3.4"
     def google_clients_version = "1.32.1"
     def google_cloud_bigdataoss_version = "2.2.2"
-    def google_cloud_pubsublite_version = "0.13.2"
+    def google_cloud_pubsublite_version = "1.0.4"
     def google_code_gson_version = "2.8.6"
     def google_oauth_clients_version = "1.31.0"
     // Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java
index b0cc681..bf6a288 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java
@@ -24,25 +24,36 @@
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
 /** Common util functions for converting between PubsubMessage proto and {@link PubsubMessage}. */
-public class PubsubMessages {
+public final class PubsubMessages {
+  private PubsubMessages() {}
+
+  public static com.google.pubsub.v1.PubsubMessage toProto(PubsubMessage input) {
+    Map<String, String> attributes = input.getAttributeMap();
+    com.google.pubsub.v1.PubsubMessage.Builder message =
+        com.google.pubsub.v1.PubsubMessage.newBuilder()
+            .setData(ByteString.copyFrom(input.getPayload()));
+    // TODO(BEAM-8085) this should not be null
+    if (attributes != null) {
+      message.putAllAttributes(attributes);
+    }
+    String messageId = input.getMessageId();
+    if (messageId != null) {
+      message.setMessageId(messageId);
+    }
+    return message.build();
+  }
+
+  public static PubsubMessage fromProto(com.google.pubsub.v1.PubsubMessage input) {
+    return new PubsubMessage(
+        input.getData().toByteArray(), input.getAttributesMap(), input.getMessageId());
+  }
+
   // Convert the PubsubMessage to a PubsubMessage proto, then return its serialized representation.
   public static class ParsePayloadAsPubsubMessageProto
       implements SerializableFunction<PubsubMessage, byte[]> {
     @Override
     public byte[] apply(PubsubMessage input) {
-      Map<String, String> attributes = input.getAttributeMap();
-      com.google.pubsub.v1.PubsubMessage.Builder message =
-          com.google.pubsub.v1.PubsubMessage.newBuilder()
-              .setData(ByteString.copyFrom(input.getPayload()));
-      // TODO(BEAM-8085) this should not be null
-      if (attributes != null) {
-        message.putAllAttributes(attributes);
-      }
-      String messageId = input.getMessageId();
-      if (messageId != null) {
-        message.setMessageId(messageId);
-      }
-      return message.build().toByteArray();
+      return toProto(input).toByteArray();
     }
   }
 
@@ -54,8 +65,7 @@
       try {
         com.google.pubsub.v1.PubsubMessage message =
             com.google.pubsub.v1.PubsubMessage.parseFrom(input);
-        return new PubsubMessage(
-            message.getData().toByteArray(), message.getAttributesMap(), message.getMessageId());
+        return fromProto(message);
       } catch (InvalidProtocolBufferException e) {
         throw new RuntimeException("Could not decode Pubsub message", e);
       }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java
deleted file mode 100644
index 6dc1516..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
-
-import com.google.cloud.pubsublite.Message;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * A class providing a conversion validity check between Cloud Pub/Sub and Pub/Sub Lite message
- * types.
- */
-public final class CloudPubsubChecks {
-  private CloudPubsubChecks() {}
-
-  /**
-   * Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the
-   * standard transformation methods in the client library.
-   *
-   * <p>Will fail the pipeline if a message has multiple attributes per key.
-   */
-  public static PTransform<PCollection<? extends PubSubMessage>, PCollection<PubSubMessage>>
-      ensureUsableAsCloudPubsub() {
-    return MapElements.into(TypeDescriptor.of(PubSubMessage.class))
-        .via(
-            message -> {
-              Object unused = toCpsPublishTransformer().transform(Message.fromProto(message));
-              return message;
-            });
-  }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
new file mode 100644
index 0000000..1140c11
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer;
+import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
+import com.google.cloud.pubsublite.proto.PubSubMessage;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message types. */
+public final class CloudPubsubTransforms {
+  private CloudPubsubTransforms() {}
+  /**
+   * Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the
+   * standard transformation methods in the client library.
+   *
+   * <p>Will fail the pipeline if a message has multiple attributes per key.
+   */
+  public static PTransform<PCollection<PubSubMessage>, PCollection<PubSubMessage>>
+      ensureUsableAsCloudPubsub() {
+    return new PTransform<PCollection<PubSubMessage>, PCollection<PubSubMessage>>() {
+      @Override
+      public PCollection<PubSubMessage> expand(PCollection<PubSubMessage> input) {
+        return input.apply(
+            MapElements.into(TypeDescriptor.of(PubSubMessage.class))
+                .via(
+                    message -> {
+                      Object unused =
+                          toCpsPublishTransformer().transform(Message.fromProto(message));
+                      return message;
+                    }));
+      }
+    };
+  }
+
+  /**
+   * Transform messages read from Pub/Sub Lite to their equivalent Cloud Pub/Sub Message that would
+   * have been read from PubsubIO.
+   *
+   * <p>Will fail the pipeline if a message has multiple attributes per map key.
+   */
+  public static PTransform<PCollection<SequencedMessage>, PCollection<PubsubMessage>>
+      toCloudPubsubMessages() {
+    return new PTransform<PCollection<SequencedMessage>, PCollection<PubsubMessage>>() {
+      @Override
+      public PCollection<PubsubMessage> expand(PCollection<SequencedMessage> input) {
+        return input.apply(
+            MapElements.into(TypeDescriptor.of(PubsubMessage.class))
+                .via(
+                    message ->
+                        PubsubMessages.fromProto(
+                            toCpsSubscribeTransformer()
+                                .transform(
+                                    com.google.cloud.pubsublite.SequencedMessage.fromProto(
+                                        message)))));
+      }
+    };
+  }
+
+  /**
+   * Transform messages publishable using PubsubIO to their equivalent Pub/Sub Lite publishable
+   * message.
+   */
+  public static PTransform<PCollection<PubsubMessage>, PCollection<PubSubMessage>>
+      fromCloudPubsubMessages() {
+    return new PTransform<PCollection<PubsubMessage>, PCollection<PubSubMessage>>() {
+      @Override
+      public PCollection<PubSubMessage> expand(PCollection<PubsubMessage> input) {
+        return input.apply(
+            MapElements.into(TypeDescriptor.of(PubSubMessage.class))
+                .via(
+                    message ->
+                        fromCpsPublishTransformer(KeyExtractor.DEFAULT)
+                            .transform(PubsubMessages.toProto(message))
+                            .toProto()));
+      }
+    };
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java
new file mode 100644
index 0000000..de0cf43
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import java.io.Serializable;
+
+/**
+ * A ManagedBacklogReaderFactory produces TopicBacklogReaders and tears down any produced readers
+ * when it is itself closed.
+ *
+ * <p>close() should never be called on produced readers.
+ */
+public interface ManagedBacklogReaderFactory extends AutoCloseable, Serializable {
+  TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition);
+
+  @Override
+  void close();
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java
new file mode 100644
index 0000000..9a337bf
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+public class ManagedBacklogReaderFactoryImpl implements ManagedBacklogReaderFactory {
+  private final SerializableFunction<SubscriptionPartition, TopicBacklogReader> newReader;
+
+  @GuardedBy("this")
+  private final Map<SubscriptionPartition, TopicBacklogReader> readers = new HashMap<>();
+
+  ManagedBacklogReaderFactoryImpl(
+      SerializableFunction<SubscriptionPartition, TopicBacklogReader> newReader) {
+    this.newReader = newReader;
+  }
+
+  private static final class NonCloseableTopicBacklogReader implements TopicBacklogReader {
+    private final TopicBacklogReader underlying;
+
+    NonCloseableTopicBacklogReader(TopicBacklogReader underlying) {
+      this.underlying = underlying;
+    }
+
+    @Override
+    public ComputeMessageStatsResponse computeMessageStats(Offset offset) throws ApiException {
+      return underlying.computeMessageStats(offset);
+    }
+
+    @Override
+    public void close() {
+      throw new IllegalArgumentException(
+          "Cannot call close() on a reader returned from ManagedBacklogReaderFactory.");
+    }
+  }
+
+  @Override
+  public synchronized TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) {
+    return new NonCloseableTopicBacklogReader(
+        readers.computeIfAbsent(subscriptionPartition, newReader::apply));
+  }
+
+  @Override
+  public synchronized void close() {
+    readers.values().forEach(TopicBacklogReader::close);
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java
new file mode 100644
index 0000000..b39d87e
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+
+@AutoValue
+@DefaultCoder(OffsetByteRangeCoder.class)
+abstract class OffsetByteRange {
+  abstract OffsetRange getRange();
+
+  abstract long getByteCount();
+
+  static OffsetByteRange of(OffsetRange range, long byteCount) {
+    return new AutoValue_OffsetByteRange(range, byteCount);
+  }
+
+  static OffsetByteRange of(OffsetRange range) {
+    return of(range, 0);
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java
new file mode 100644
index 0000000..076cda1
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.coders.DelegateCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class OffsetByteRangeCoder extends AtomicCoder<OffsetByteRange> {
+  private static final Coder<OffsetByteRange> CODER =
+      DelegateCoder.of(
+          KvCoder.of(OffsetRange.Coder.of(), VarLongCoder.of()),
+          OffsetByteRangeCoder::toKv,
+          OffsetByteRangeCoder::fromKv);
+
+  private static KV<OffsetRange, Long> toKv(OffsetByteRange value) {
+    return KV.of(value.getRange(), value.getByteCount());
+  }
+
+  private static OffsetByteRange fromKv(KV<OffsetRange, Long> kv) {
+    return OffsetByteRange.of(kv.getKey(), kv.getValue());
+  }
+
+  @Override
+  public void encode(OffsetByteRange value, OutputStream outStream) throws IOException {
+    CODER.encode(value, outStream);
+  }
+
+  @Override
+  public OffsetByteRange decode(InputStream inStream) throws IOException {
+    return CODER.decode(inStream);
+  }
+
+  public static CoderProvider getCoderProvider() {
+    return CoderProviders.forCoder(
+        TypeDescriptor.of(OffsetByteRange.class), new OffsetByteRangeCoder());
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java
index 608af8f..da9aaaa 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java
@@ -26,8 +26,6 @@
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.range.OffsetRange;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
 import org.joda.time.Duration;
@@ -44,24 +42,27 @@
  * received. IMPORTANT: minTrackingTime must be strictly smaller than the SDF read timeout when it
  * would return ProcessContinuation.resume().
  */
-class OffsetByteRangeTracker extends RestrictionTracker<OffsetRange, OffsetByteProgress>
-    implements HasProgress {
-  private final TopicBacklogReader backlogReader;
+class OffsetByteRangeTracker extends TrackerWithProgress {
+  private final TopicBacklogReader unownedBacklogReader;
   private final Duration minTrackingTime;
   private final long minBytesReceived;
   private final Stopwatch stopwatch;
-  private OffsetRange range;
+  private OffsetByteRange range;
   private @Nullable Long lastClaimed;
-  private long byteCount = 0;
 
   public OffsetByteRangeTracker(
-      OffsetRange range,
-      TopicBacklogReader backlogReader,
+      OffsetByteRange range,
+      TopicBacklogReader unownedBacklogReader,
       Stopwatch stopwatch,
       Duration minTrackingTime,
       long minBytesReceived) {
-    checkArgument(range.getTo() == Long.MAX_VALUE);
-    this.backlogReader = backlogReader;
+    checkArgument(
+        range.getRange().getTo() == Long.MAX_VALUE,
+        "May only construct OffsetByteRangeTracker with an unbounded range with no progress.");
+    checkArgument(
+        range.getByteCount() == 0L,
+        "May only construct OffsetByteRangeTracker with an unbounded range with no progress.");
+    this.unownedBacklogReader = unownedBacklogReader;
     this.minTrackingTime = minTrackingTime;
     this.minBytesReceived = minBytesReceived;
     this.stopwatch = stopwatch.reset().start();
@@ -69,11 +70,6 @@
   }
 
   @Override
-  public void finalize() {
-    this.backlogReader.close();
-  }
-
-  @Override
   public IsBounded isBounded() {
     return IsBounded.UNBOUNDED;
   }
@@ -87,32 +83,32 @@
         position.lastOffset().value(),
         lastClaimed);
     checkArgument(
-        toClaim >= range.getFrom(),
+        toClaim >= range.getRange().getFrom(),
         "Trying to claim offset %s before start of the range %s",
         toClaim,
         range);
     // split() has already been called, truncating this range. No more offsets may be claimed.
-    if (range.getTo() != Long.MAX_VALUE) {
-      boolean isRangeEmpty = range.getTo() == range.getFrom();
-      boolean isValidClosedRange = nextOffset() == range.getTo();
+    if (range.getRange().getTo() != Long.MAX_VALUE) {
+      boolean isRangeEmpty = range.getRange().getTo() == range.getRange().getFrom();
+      boolean isValidClosedRange = nextOffset() == range.getRange().getTo();
       checkState(
           isRangeEmpty || isValidClosedRange,
           "Violated class precondition: offset range improperly split. Please report a beam bug.");
       return false;
     }
     lastClaimed = toClaim;
-    byteCount += position.batchBytes();
+    range = OffsetByteRange.of(range.getRange(), range.getByteCount() + position.batchBytes());
     return true;
   }
 
   @Override
-  public OffsetRange currentRestriction() {
+  public OffsetByteRange currentRestriction() {
     return range;
   }
 
   private long nextOffset() {
     checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE);
-    return lastClaimed == null ? currentRestriction().getFrom() : lastClaimed + 1;
+    return lastClaimed == null ? currentRestriction().getRange().getFrom() : lastClaimed + 1;
   }
 
   /**
@@ -124,29 +120,33 @@
     if (duration.isLongerThan(minTrackingTime)) {
       return true;
     }
-    if (byteCount >= minBytesReceived) {
+    if (currentRestriction().getByteCount() >= minBytesReceived) {
       return true;
     }
     return false;
   }
 
   @Override
-  public @Nullable SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
+  public @Nullable SplitResult<OffsetByteRange> trySplit(double fractionOfRemainder) {
     // Cannot split a bounded range. This should already be completely claimed.
-    if (range.getTo() != Long.MAX_VALUE) {
+    if (range.getRange().getTo() != Long.MAX_VALUE) {
       return null;
     }
     if (!receivedEnough()) {
       return null;
     }
-    range = new OffsetRange(currentRestriction().getFrom(), nextOffset());
-    return SplitResult.of(this.range, new OffsetRange(nextOffset(), Long.MAX_VALUE));
+    range =
+        OffsetByteRange.of(
+            new OffsetRange(currentRestriction().getRange().getFrom(), nextOffset()),
+            range.getByteCount());
+    return SplitResult.of(
+        this.range, OffsetByteRange.of(new OffsetRange(nextOffset(), Long.MAX_VALUE), 0));
   }
 
   @Override
   @SuppressWarnings("unboxing.of.nullable")
   public void checkDone() throws IllegalStateException {
-    if (range.getFrom() == range.getTo()) {
+    if (range.getRange().getFrom() == range.getRange().getTo()) {
       return;
     }
     checkState(
@@ -155,18 +155,18 @@
         range);
     long lastClaimedNotNull = checkNotNull(lastClaimed);
     checkState(
-        lastClaimedNotNull >= range.getTo() - 1,
+        lastClaimedNotNull >= range.getRange().getTo() - 1,
         "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",
         lastClaimedNotNull,
         range,
         lastClaimedNotNull + 1,
-        range.getTo());
+        range.getRange().getTo());
   }
 
   @Override
   public Progress getProgress() {
     ComputeMessageStatsResponse stats =
-        this.backlogReader.computeMessageStats(Offset.of(nextOffset()));
-    return Progress.from(byteCount, stats.getMessageBytes());
+        this.unownedBacklogReader.computeMessageStats(Offset.of(nextOffset()));
+    return Progress.from(range.getByteCount(), stats.getMessageBytes());
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java
index 623e20c..d7526d8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java
@@ -27,4 +27,8 @@
   private PerServerPublisherCache() {}
 
   static final PublisherCache PUBLISHER_CACHE = new PublisherCache();
+
+  static {
+    Runtime.getRuntime().addShutdownHook(new Thread(PUBLISHER_CACHE::close));
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
index a9f7a43..fdf7920 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
@@ -17,13 +17,12 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite;
 
-import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
 
 import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
 import com.google.cloud.pubsublite.internal.wire.Committer;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
-import java.util.concurrent.ExecutionException;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableBiFunction;
@@ -35,31 +34,35 @@
 
 class PerSubscriptionPartitionSdf extends DoFn<SubscriptionPartition, SequencedMessage> {
   private final Duration maxSleepTime;
+  private final ManagedBacklogReaderFactory backlogReaderFactory;
   private final SubscriptionPartitionProcessorFactory processorFactory;
   private final SerializableFunction<SubscriptionPartition, InitialOffsetReader>
       offsetReaderFactory;
-  private final SerializableBiFunction<
-          SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>>
+  private final SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress>
       trackerFactory;
   private final SerializableFunction<SubscriptionPartition, Committer> committerFactory;
 
   PerSubscriptionPartitionSdf(
       Duration maxSleepTime,
+      ManagedBacklogReaderFactory backlogReaderFactory,
       SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory,
-      SerializableBiFunction<
-              SubscriptionPartition,
-              OffsetRange,
-              RestrictionTracker<OffsetRange, OffsetByteProgress>>
+      SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress>
           trackerFactory,
       SubscriptionPartitionProcessorFactory processorFactory,
       SerializableFunction<SubscriptionPartition, Committer> committerFactory) {
     this.maxSleepTime = maxSleepTime;
+    this.backlogReaderFactory = backlogReaderFactory;
     this.processorFactory = processorFactory;
     this.offsetReaderFactory = offsetReaderFactory;
     this.trackerFactory = trackerFactory;
     this.committerFactory = committerFactory;
   }
 
+  @Teardown
+  public void teardown() {
+    backlogReaderFactory.close();
+  }
+
   @GetInitialWatermarkEstimatorState
   public Instant getInitialWatermarkState() {
     return Instant.EPOCH;
@@ -72,7 +75,7 @@
 
   @ProcessElement
   public ProcessContinuation processElement(
-      RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
+      RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       @Element SubscriptionPartition subscriptionPartition,
       OutputReceiver<SequencedMessage> receiver)
       throws Exception {
@@ -83,38 +86,44 @@
       processor
           .lastClaimed()
           .ifPresent(
-              lastClaimedOffset ->
-              /* TODO(boyuanzz): When default dataflow can use finalizers, undo this.
-              finalizer.afterBundleCommit(
-                  Instant.ofEpochMilli(Long.MAX_VALUE),
-                  () -> */ {
+              lastClaimedOffset -> {
                 Committer committer = committerFactory.apply(subscriptionPartition);
                 committer.startAsync().awaitRunning();
                 // Commit the next-to-deliver offset.
                 try {
                   committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
-                } catch (ExecutionException e) {
-                  throw toCanonical(checkArgumentNotNull(e.getCause())).underlying;
                 } catch (Exception e) {
-                  throw toCanonical(e).underlying;
+                  throw ExtractStatus.toCanonical(e).underlying;
                 }
-                committer.stopAsync().awaitTerminated();
+                blockingShutdown(committer);
               });
       return result;
     }
   }
 
   @GetInitialRestriction
-  public OffsetRange getInitialRestriction(@Element SubscriptionPartition subscriptionPartition) {
+  public OffsetByteRange getInitialRestriction(
+      @Element SubscriptionPartition subscriptionPartition) {
     try (InitialOffsetReader reader = offsetReaderFactory.apply(subscriptionPartition)) {
       Offset offset = reader.read();
-      return new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */);
+      return OffsetByteRange.of(
+          new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */));
     }
   }
 
   @NewTracker
-  public RestrictionTracker<OffsetRange, OffsetByteProgress> newTracker(
-      @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetRange range) {
-    return trackerFactory.apply(subscriptionPartition, range);
+  public TrackerWithProgress newTracker(
+      @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetByteRange range) {
+    return trackerFactory.apply(backlogReaderFactory.newReader(subscriptionPartition), range);
+  }
+
+  @GetSize
+  public double getSize(
+      @Element SubscriptionPartition subscriptionPartition,
+      @Restriction OffsetByteRange restriction) {
+    if (restriction.getRange().getTo() != Long.MAX_VALUE) {
+      return restriction.getByteCount();
+    }
+    return newTracker(subscriptionPartition, restriction).getProgress().getWorkRemaining();
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java
index f8dc24b..3dbdec6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java
@@ -23,52 +23,50 @@
 import com.google.api.core.ApiService.State;
 import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.MessageMetadata;
-import com.google.cloud.pubsublite.internal.CloseableMonitor;
 import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import java.util.HashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 
 /** A map of working publishers by PublisherOptions. */
-class PublisherCache {
-  private final CloseableMonitor monitor = new CloseableMonitor();
-
-  private final Executor listenerExecutor = Executors.newSingleThreadExecutor();
-
-  @GuardedBy("monitor.monitor")
+class PublisherCache implements AutoCloseable {
+  @GuardedBy("this")
   private final HashMap<PublisherOptions, Publisher<MessageMetadata>> livePublishers =
       new HashMap<>();
 
-  Publisher<MessageMetadata> get(PublisherOptions options) throws ApiException {
+  private synchronized void evict(PublisherOptions options) {
+    livePublishers.remove(options);
+  }
+
+  synchronized Publisher<MessageMetadata> get(PublisherOptions options) throws ApiException {
     checkArgument(options.usesCache());
-    try (CloseableMonitor.Hold h = monitor.enter()) {
-      Publisher<MessageMetadata> publisher = livePublishers.get(options);
-      if (publisher != null) {
-        return publisher;
-      }
-      publisher = Publishers.newPublisher(options);
-      livePublishers.put(options, publisher);
-      publisher.addListener(
-          new Listener() {
-            @Override
-            public void failed(State s, Throwable t) {
-              try (CloseableMonitor.Hold h = monitor.enter()) {
-                livePublishers.remove(options);
-              }
-            }
-          },
-          listenerExecutor);
-      publisher.startAsync().awaitRunning();
+    Publisher<MessageMetadata> publisher = livePublishers.get(options);
+    if (publisher != null) {
       return publisher;
     }
+    publisher = Publishers.newPublisher(options);
+    livePublishers.put(options, publisher);
+    publisher.addListener(
+        new Listener() {
+          @Override
+          public void failed(State s, Throwable t) {
+            evict(options);
+          }
+        },
+        SystemExecutors.getFuturesExecutor());
+    publisher.startAsync().awaitRunning();
+    return publisher;
   }
 
   @VisibleForTesting
-  void set(PublisherOptions options, Publisher<MessageMetadata> toCache) {
-    try (CloseableMonitor.Hold h = monitor.enter()) {
-      livePublishers.put(options, toCache);
-    }
+  synchronized void set(PublisherOptions options, Publisher<MessageMetadata> toCache) {
+    livePublishers.put(options, toCache);
+  }
+
+  @Override
+  public synchronized void close() {
+    livePublishers.forEach(((options, publisher) -> publisher.stopAsync()));
+    livePublishers.clear();
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java
index 34012f7..67ea6cf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java
@@ -17,17 +17,27 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite;
 
+import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
 import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
 
 import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.AdminClient;
 import com.google.cloud.pubsublite.AdminClientSettings;
 import com.google.cloud.pubsublite.MessageMetadata;
-import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
 import com.google.cloud.pubsublite.internal.Publisher;
 import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
+import com.google.cloud.pubsublite.internal.wire.PubsubContext;
 import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
+import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
 import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
+import com.google.cloud.pubsublite.v1.AdminServiceClient;
+import com.google.cloud.pubsublite.v1.AdminServiceSettings;
+import com.google.cloud.pubsublite.v1.PublisherServiceClient;
+import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken;
 
 class Publishers {
@@ -35,6 +45,38 @@
 
   private Publishers() {}
 
+  private static AdminClient newAdminClient(PublisherOptions options) throws ApiException {
+    try {
+      return AdminClient.create(
+          AdminClientSettings.newBuilder()
+              .setServiceClient(
+                  AdminServiceClient.create(
+                      addDefaultSettings(
+                          options.topicPath().location().extractRegion(),
+                          AdminServiceSettings.newBuilder())))
+              .setRegion(options.topicPath().location().extractRegion())
+              .build());
+    } catch (Throwable t) {
+      throw toCanonical(t).underlying;
+    }
+  }
+
+  private static PublisherServiceClient newServiceClient(
+      PublisherOptions options, Partition partition) {
+    PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
+    settingsBuilder =
+        addDefaultMetadata(
+            PubsubContext.of(FRAMEWORK),
+            RoutingMetadata.of(options.topicPath(), partition),
+            settingsBuilder);
+    try {
+      return PublisherServiceClient.create(
+          addDefaultSettings(options.topicPath().location().extractRegion(), settingsBuilder));
+    } catch (Throwable t) {
+      throw toCanonical(t).underlying;
+    }
+  }
+
   @SuppressWarnings("unchecked")
   static Publisher<MessageMetadata> newPublisher(PublisherOptions options) throws ApiException {
     SerializableSupplier<Object> supplier = options.publisherSupplier();
@@ -44,20 +86,18 @@
       checkArgument(token.isSupertypeOf(supplied.getClass()));
       return (Publisher<MessageMetadata>) supplied;
     }
-
-    TopicPath topic = options.topicPath();
-    PartitionCountWatchingPublisherSettings.Builder publisherSettings =
-        PartitionCountWatchingPublisherSettings.newBuilder()
-            .setTopic(topic)
-            .setPublisherFactory(
-                partition ->
-                    SinglePartitionPublisherBuilder.newBuilder()
-                        .setTopic(topic)
-                        .setPartition(partition)
-                        .build())
-            .setAdminClient(
-                AdminClient.create(
-                    AdminClientSettings.newBuilder().setRegion(topic.location().region()).build()));
-    return publisherSettings.build().instantiate();
+    return PartitionCountWatchingPublisherSettings.newBuilder()
+        .setTopic(options.topicPath())
+        .setPublisherFactory(
+            partition ->
+                SinglePartitionPublisherBuilder.newBuilder()
+                    .setTopic(options.topicPath())
+                    .setPartition(partition)
+                    .setServiceClient(newServiceClient(options, partition))
+                    .setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS)
+                    .build())
+        .setAdminClient(newAdminClient(options))
+        .build()
+        .instantiate();
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
index ca1f2be..b93ac61 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
@@ -107,7 +107,7 @@
    * }</pre>
    */
   public static PTransform<PCollection<PubSubMessage>, PDone> write(PublisherOptions options) {
-    return new PTransform<PCollection<PubSubMessage>, PDone>("PubsubLiteIO") {
+    return new PTransform<PCollection<PubSubMessage>, PDone>() {
       @Override
       public PDone expand(PCollection<PubSubMessage> input) {
         PubsubLiteSink sink = new PubsubLiteSink(options);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java
index d3acdfa..d0e3afa 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java
@@ -28,16 +28,14 @@
 import com.google.cloud.pubsublite.internal.CheckedApiException;
 import com.google.cloud.pubsublite.internal.ExtractStatus;
 import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
 import com.google.cloud.pubsublite.proto.PubSubMessage;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import java.util.ArrayDeque;
 import java.util.Deque;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOrError.Kind;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
 
 /** A sink which publishes messages to Pub/Sub Lite. */
 @SuppressWarnings({
@@ -56,8 +54,6 @@
   @GuardedBy("this")
   private transient Deque<CheckedApiException> errorsSinceLastFinish;
 
-  private static final Executor executor = Executors.newCachedThreadPool();
-
   PubsubLiteSink(PublisherOptions options) {
     this.options = options;
   }
@@ -89,7 +85,7 @@
             onFailure.accept(t);
           }
         },
-        MoreExecutors.directExecutor());
+        SystemExecutors.getFuturesExecutor());
     if (!options.usesCache()) {
       publisher.startAsync();
     }
@@ -130,7 +126,7 @@
             onFailure.accept(t);
           }
         },
-        executor);
+        SystemExecutors.getFuturesExecutor());
   }
 
   // Intentionally don't flush on bundle finish to allow multi-sink client reuse.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java
index 9875880..b6a9f5d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java
@@ -23,6 +23,7 @@
 import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.AdminClient;
 import com.google.cloud.pubsublite.AdminClientSettings;
+import com.google.cloud.pubsublite.Offset;
 import com.google.cloud.pubsublite.Partition;
 import com.google.cloud.pubsublite.TopicPath;
 import com.google.cloud.pubsublite.internal.wire.Committer;
@@ -31,8 +32,6 @@
 import java.util.List;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
-import org.apache.beam.sdk.io.range.OffsetRange;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -54,10 +53,11 @@
     checkArgument(subscriptionPartition.subscription().equals(options.subscriptionPath()));
   }
 
-  private Subscriber newSubscriber(Partition partition, Consumer<List<SequencedMessage>> consumer) {
+  private Subscriber newSubscriber(
+      Partition partition, Offset initialOffset, Consumer<List<SequencedMessage>> consumer) {
     try {
       return options
-          .getSubscriberFactory(partition)
+          .getSubscriberFactory(partition, initialOffset)
           .newSubscriber(
               messages ->
                   consumer.accept(
@@ -71,23 +71,31 @@
 
   private SubscriptionPartitionProcessor newPartitionProcessor(
       SubscriptionPartition subscriptionPartition,
-      RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
+      RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver)
       throws ApiException {
     checkSubscription(subscriptionPartition);
     return new SubscriptionPartitionProcessorImpl(
         tracker,
         receiver,
-        consumer -> newSubscriber(subscriptionPartition.partition(), consumer),
+        consumer ->
+            newSubscriber(
+                subscriptionPartition.partition(),
+                Offset.of(tracker.currentRestriction().getRange().getFrom()),
+                consumer),
         options.flowControlSettings());
   }
 
-  private RestrictionTracker<OffsetRange, OffsetByteProgress> newRestrictionTracker(
-      SubscriptionPartition subscriptionPartition, OffsetRange initial) {
+  private TopicBacklogReader newBacklogReader(SubscriptionPartition subscriptionPartition) {
     checkSubscription(subscriptionPartition);
+    return options.getBacklogReader(subscriptionPartition.partition());
+  }
+
+  private TrackerWithProgress newRestrictionTracker(
+      TopicBacklogReader backlogReader, OffsetByteRange initial) {
     return new OffsetByteRangeTracker(
         initial,
-        options.getBacklogReader(subscriptionPartition.partition()),
+        backlogReader,
         Stopwatch.createUnstarted(),
         options.minBundleTimeout(),
         LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10));
@@ -107,7 +115,7 @@
     try (AdminClient admin =
         AdminClient.create(
             AdminClientSettings.newBuilder()
-                .setRegion(options.subscriptionPath().location().region())
+                .setRegion(options.subscriptionPath().location().extractRegion())
                 .build())) {
       return TopicPath.parse(admin.getSubscription(options.subscriptionPath()).get().getTopic());
     } catch (Throwable t) {
@@ -118,25 +126,15 @@
   @Override
   public PCollection<SequencedMessage> expand(PBegin input) {
     PCollection<SubscriptionPartition> subscriptionPartitions;
-    if (options.partitions().isEmpty()) {
-      subscriptionPartitions =
-          input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath()));
-    } else {
-      subscriptionPartitions =
-          input.apply(
-              Create.of(
-                  options.partitions().stream()
-                      .map(
-                          partition ->
-                              SubscriptionPartition.of(options.subscriptionPath(), partition))
-                      .collect(Collectors.toList())));
-    }
+    subscriptionPartitions =
+        input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath()));
 
     return subscriptionPartitions.apply(
         ParDo.of(
             new PerSubscriptionPartitionSdf(
                 // Ensure we read for at least 5 seconds more than the bundle timeout.
                 options.minBundleTimeout().plus(Duration.standardSeconds(5)),
+                new ManagedBacklogReaderFactoryImpl(this::newBacklogReader),
                 this::newInitialOffsetReader,
                 this::newRestrictionTracker,
                 this::newPartitionProcessor,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
index 0d3afe2..a9625be 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
@@ -23,6 +23,7 @@
 
 import com.google.api.gax.rpc.ApiException;
 import com.google.auto.value.AutoValue;
+import com.google.cloud.pubsublite.Offset;
 import com.google.cloud.pubsublite.Partition;
 import com.google.cloud.pubsublite.SubscriptionPath;
 import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
@@ -35,13 +36,13 @@
 import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
 import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
 import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
+import com.google.cloud.pubsublite.proto.Cursor;
+import com.google.cloud.pubsublite.proto.SeekRequest;
 import com.google.cloud.pubsublite.v1.CursorServiceClient;
 import com.google.cloud.pubsublite.v1.CursorServiceSettings;
 import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
 import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
 import java.io.Serializable;
-import java.util.Set;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 
@@ -69,11 +70,6 @@
   public abstract FlowControlSettings flowControlSettings();
 
   /**
-   * A set of partitions. If empty, continuously poll the set of partitions using an admin client.
-   */
-  public abstract Set<Partition> partitions();
-
-  /**
    * The minimum wall time to pass before allowing bundle closure.
    *
    * <p>Setting this to too small of a value will result in increased compute costs and lower
@@ -108,7 +104,6 @@
   public static Builder newBuilder() {
     Builder builder = new AutoValue_SubscriberOptions.Builder();
     return builder
-        .setPartitions(ImmutableSet.of())
         .setFlowControlSettings(DEFAULT_FLOW_CONTROL)
         .setMinBundleTimeout(MIN_BUNDLE_TIMEOUT);
   }
@@ -119,20 +114,19 @@
       throws ApiException {
     try {
       SubscriberServiceSettings.Builder settingsBuilder = SubscriberServiceSettings.newBuilder();
-
       settingsBuilder =
           addDefaultMetadata(
               PubsubContext.of(FRAMEWORK),
               RoutingMetadata.of(subscriptionPath(), partition),
               settingsBuilder);
       return SubscriberServiceClient.create(
-          addDefaultSettings(subscriptionPath().location().region(), settingsBuilder));
+          addDefaultSettings(subscriptionPath().location().extractRegion(), settingsBuilder));
     } catch (Throwable t) {
       throw toCanonical(t).underlying;
     }
   }
 
-  SubscriberFactory getSubscriberFactory(Partition partition) {
+  SubscriberFactory getSubscriberFactory(Partition partition, Offset initialOffset) {
     SubscriberFactory factory = subscriberFactory();
     if (factory != null) {
       return factory;
@@ -143,6 +137,10 @@
             .setSubscriptionPath(subscriptionPath())
             .setPartition(partition)
             .setServiceClient(newSubscriberServiceClient(partition))
+            .setInitialLocation(
+                SeekRequest.newBuilder()
+                    .setCursor(Cursor.newBuilder().setOffset(initialOffset.value()))
+                    .build())
             .build();
   }
 
@@ -150,7 +148,7 @@
     try {
       return CursorServiceClient.create(
           addDefaultSettings(
-              subscriptionPath().location().region(), CursorServiceSettings.newBuilder()));
+              subscriptionPath().location().extractRegion(), CursorServiceSettings.newBuilder()));
     } catch (Throwable t) {
       throw toCanonical(t).underlying;
     }
@@ -189,7 +187,7 @@
     return new InitialOffsetReaderImpl(
         CursorClient.create(
             CursorClientSettings.newBuilder()
-                .setRegion(subscriptionPath().location().region())
+                .setRegion(subscriptionPath().location().extractRegion())
                 .build()),
         subscriptionPath(),
         partition);
@@ -201,8 +199,6 @@
     public abstract Builder setSubscriptionPath(SubscriptionPath path);
 
     // Optional parameters
-    public abstract Builder setPartitions(Set<Partition> partitions);
-
     public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);
 
     public abstract Builder setMinBundleTimeout(Duration minBundleTimeout);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoader.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoader.java
index 866e922..e411d80 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoader.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoader.java
@@ -92,7 +92,9 @@
                     })
                 .withPollInterval(pollDuration)
                 .withTerminationPerInput(
-                    terminate ? Watch.Growth.afterIterations(10) : Watch.Growth.never()));
+                    terminate
+                        ? Watch.Growth.afterTotalOf(pollDuration.multipliedBy(10))
+                        : Watch.Growth.never()));
     return partitions.apply(
         MapElements.into(TypeDescriptor.of(SubscriptionPartition.class))
             .via(kv -> SubscriptionPartition.of(subscription, kv.getValue())));
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java
index 6bf3623..530c180 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java
@@ -19,7 +19,6 @@
 
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import java.io.Serializable;
-import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 
@@ -28,6 +27,6 @@
 
   SubscriptionPartitionProcessor newProcessor(
       SubscriptionPartition subscriptionPartition,
-      RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
+      RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver);
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java
index 8d2a137..a086d18 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite;
 
+import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+
 import com.google.api.core.ApiService.Listener;
 import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
@@ -24,9 +26,8 @@
 import com.google.cloud.pubsublite.internal.CheckedApiException;
 import com.google.cloud.pubsublite.internal.ExtractStatus;
 import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.proto.Cursor;
+import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
 import com.google.cloud.pubsublite.proto.FlowControlRequest;
-import com.google.cloud.pubsublite.proto.SeekRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
 import java.util.List;
@@ -36,19 +37,17 @@
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 class SubscriptionPartitionProcessorImpl extends Listener
     implements SubscriptionPartitionProcessor {
-  private final RestrictionTracker<OffsetRange, OffsetByteProgress> tracker;
+  private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
   private final OutputReceiver<SequencedMessage> receiver;
   private final Subscriber subscriber;
   private final SettableFuture<Void> completionFuture = SettableFuture.create();
@@ -57,7 +56,7 @@
 
   @SuppressWarnings("methodref.receiver.bound.invalid")
   SubscriptionPartitionProcessorImpl(
-      RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
+      RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver,
       Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
       FlowControlSettings flowControlSettings) {
@@ -70,23 +69,15 @@
   @Override
   @SuppressWarnings("argument.type.incompatible")
   public void start() throws CheckedApiException {
-    this.subscriber.addListener(this, MoreExecutors.directExecutor());
+    this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor());
     this.subscriber.startAsync();
     this.subscriber.awaitRunning();
     try {
-      this.subscriber
-          .seek(
-              SeekRequest.newBuilder()
-                  .setCursor(Cursor.newBuilder().setOffset(tracker.currentRestriction().getFrom()))
-                  .build())
-          .get();
       this.subscriber.allowFlow(
           FlowControlRequest.newBuilder()
               .setAllowedBytes(flowControlSettings.bytesOutstanding())
               .setAllowedMessages(flowControlSettings.messagesOutstanding())
               .build());
-    } catch (ExecutionException e) {
-      throw ExtractStatus.toCanonical(e.getCause());
     } catch (Throwable t) {
       throw ExtractStatus.toCanonical(t);
     }
@@ -125,7 +116,7 @@
 
   @Override
   public void close() {
-    subscriber.stopAsync().awaitTerminated();
+    blockingShutdown(subscriber);
   }
 
   @Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java
index 8c1dd94..79db0f1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java
@@ -62,7 +62,7 @@
       try (AdminClient adminClient =
           AdminClient.create(
               AdminClientSettings.newBuilder()
-                  .setRegion(subscriptionPath.location().region())
+                  .setRegion(subscriptionPath.location().extractRegion())
                   .build())) {
         return setTopicPath(
             TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic()));
@@ -81,7 +81,9 @@
 
   TopicBacklogReader instantiate() throws ApiException {
     TopicStatsClientSettings settings =
-        TopicStatsClientSettings.newBuilder().setRegion(topicPath().location().region()).build();
+        TopicStatsClientSettings.newBuilder()
+            .setRegion(topicPath().location().extractRegion())
+            .build();
     TopicBacklogReader impl =
         new TopicBacklogReaderImpl(TopicStatsClient.create(settings), topicPath(), partition());
     return new LimitingTopicBacklogReader(impl, Ticker.systemTicker());
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java
new file mode 100644
index 0000000..7f0d030
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+
+public abstract class TrackerWithProgress
+    extends RestrictionTracker<OffsetByteRange, OffsetByteProgress> implements HasProgress {}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java
index f34ebb6..5a31f4f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java
@@ -49,7 +49,7 @@
   private static final double IGNORED_FRACTION = -10000000.0;
   private static final long MIN_BYTES = 1000;
   private static final OffsetRange RANGE = new OffsetRange(123L, Long.MAX_VALUE);
-  private final TopicBacklogReader reader = mock(TopicBacklogReader.class);
+  private final TopicBacklogReader unownedBacklogReader = mock(TopicBacklogReader.class);
 
   @Spy Ticker ticker;
   private OffsetByteRangeTracker tracker;
@@ -60,14 +60,18 @@
     when(ticker.read()).thenReturn(0L);
     tracker =
         new OffsetByteRangeTracker(
-            RANGE, reader, Stopwatch.createUnstarted(ticker), Duration.millis(500), MIN_BYTES);
+            OffsetByteRange.of(RANGE, 0),
+            unownedBacklogReader,
+            Stopwatch.createUnstarted(ticker),
+            Duration.millis(500),
+            MIN_BYTES);
   }
 
   @Test
   public void progressTracked() {
     assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(123), 10)));
     assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(124), 11)));
-    when(reader.computeMessageStats(Offset.of(125)))
+    when(unownedBacklogReader.computeMessageStats(Offset.of(125)))
         .thenReturn(ComputeMessageStatsResponse.newBuilder().setMessageBytes(1000).build());
     Progress progress = tracker.getProgress();
     assertEquals(21, progress.getWorkCompleted(), .0001);
@@ -76,7 +80,7 @@
 
   @Test
   public void getProgressStatsFailure() {
-    when(reader.computeMessageStats(Offset.of(123)))
+    when(unownedBacklogReader.computeMessageStats(Offset.of(123)))
         .thenThrow(new CheckedApiException(Code.INTERNAL).underlying);
     assertThrows(ApiException.class, tracker::getProgress);
   }
@@ -86,11 +90,15 @@
   public void claimSplitSuccess() {
     assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES)));
     assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(10_000), MIN_BYTES)));
-    SplitResult<OffsetRange> splits = tracker.trySplit(IGNORED_FRACTION);
-    assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom());
-    assertEquals(10_001, splits.getPrimary().getTo());
-    assertEquals(10_001, splits.getResidual().getFrom());
-    assertEquals(Long.MAX_VALUE, splits.getResidual().getTo());
+    SplitResult<OffsetByteRange> splits = tracker.trySplit(IGNORED_FRACTION);
+    OffsetByteRange primary = splits.getPrimary();
+    assertEquals(RANGE.getFrom(), primary.getRange().getFrom());
+    assertEquals(10_001, primary.getRange().getTo());
+    assertEquals(MIN_BYTES * 2, primary.getByteCount());
+    OffsetByteRange residual = splits.getResidual();
+    assertEquals(10_001, residual.getRange().getFrom());
+    assertEquals(Long.MAX_VALUE, residual.getRange().getTo());
+    assertEquals(0, residual.getByteCount());
     assertEquals(splits.getPrimary(), tracker.currentRestriction());
     tracker.checkDone();
     assertNull(tracker.trySplit(IGNORED_FRACTION));
@@ -100,10 +108,10 @@
   @SuppressWarnings({"dereference.of.nullable", "argument.type.incompatible"})
   public void splitWithoutClaimEmpty() {
     when(ticker.read()).thenReturn(100000000000000L);
-    SplitResult<OffsetRange> splits = tracker.trySplit(IGNORED_FRACTION);
-    assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom());
-    assertEquals(RANGE.getFrom(), splits.getPrimary().getTo());
-    assertEquals(RANGE, splits.getResidual());
+    SplitResult<OffsetByteRange> splits = tracker.trySplit(IGNORED_FRACTION);
+    assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getFrom());
+    assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getTo());
+    assertEquals(RANGE, splits.getResidual().getRange());
     assertEquals(splits.getPrimary(), tracker.currentRestriction());
     tracker.checkDone();
     assertNull(tracker.trySplit(IGNORED_FRACTION));
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java
index 598037e..0a4e3e7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java
@@ -28,6 +28,7 @@
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
 import static org.mockito.MockitoAnnotations.initMocks;
 
@@ -51,6 +52,8 @@
 import org.apache.beam.sdk.transforms.SerializableBiFunction;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
 import org.joda.time.Duration;
 import org.junit.Before;
 import org.junit.Test;
@@ -65,22 +68,24 @@
 public class PerSubscriptionPartitionSdfTest {
   private static final Duration MAX_SLEEP_TIME =
       Duration.standardMinutes(10).plus(Duration.millis(10));
-  private static final OffsetRange RESTRICTION = new OffsetRange(1, Long.MAX_VALUE);
+  private static final OffsetByteRange RESTRICTION =
+      OffsetByteRange.of(new OffsetRange(1, Long.MAX_VALUE), 0);
   private static final SubscriptionPartition PARTITION =
       SubscriptionPartition.of(example(SubscriptionPath.class), example(Partition.class));
 
   @Mock SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory;
 
+  @Mock ManagedBacklogReaderFactory backlogReaderFactory;
+  @Mock TopicBacklogReader backlogReader;
+
   @Mock
-  SerializableBiFunction<
-          SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>>
-      trackerFactory;
+  SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress> trackerFactory;
 
   @Mock SubscriptionPartitionProcessorFactory processorFactory;
   @Mock SerializableFunction<SubscriptionPartition, Committer> committerFactory;
 
   @Mock InitialOffsetReader initialOffsetReader;
-  @Spy RestrictionTracker<OffsetRange, OffsetByteProgress> tracker;
+  @Spy TrackerWithProgress tracker;
   @Mock OutputReceiver<SequencedMessage> output;
   @Mock SubscriptionPartitionProcessor processor;
 
@@ -98,9 +103,11 @@
     when(trackerFactory.apply(any(), any())).thenReturn(tracker);
     when(committerFactory.apply(any())).thenReturn(committer);
     when(tracker.currentRestriction()).thenReturn(RESTRICTION);
+    when(backlogReaderFactory.newReader(any())).thenReturn(backlogReader);
     sdf =
         new PerSubscriptionPartitionSdf(
             MAX_SLEEP_TIME,
+            backlogReaderFactory,
             offsetReaderFactory,
             trackerFactory,
             processorFactory,
@@ -110,9 +117,10 @@
   @Test
   public void getInitialRestrictionReadSuccess() {
     when(initialOffsetReader.read()).thenReturn(example(Offset.class));
-    OffsetRange range = sdf.getInitialRestriction(PARTITION);
-    assertEquals(example(Offset.class).value(), range.getFrom());
-    assertEquals(Long.MAX_VALUE, range.getTo());
+    OffsetByteRange range = sdf.getInitialRestriction(PARTITION);
+    assertEquals(example(Offset.class).value(), range.getRange().getFrom());
+    assertEquals(Long.MAX_VALUE, range.getRange().getTo());
+    assertEquals(0, range.getByteCount());
     verify(offsetReaderFactory).apply(PARTITION);
   }
 
@@ -125,7 +133,13 @@
   @Test
   public void newTrackerCallsFactory() {
     assertSame(tracker, sdf.newTracker(PARTITION, RESTRICTION));
-    verify(trackerFactory).apply(PARTITION, RESTRICTION);
+    verify(trackerFactory).apply(backlogReader, RESTRICTION);
+  }
+
+  @Test
+  public void tearDownClosesBacklogReaderFactory() {
+    sdf.teardown();
+    verify(backlogReaderFactory).close();
   }
 
   @Test
@@ -159,12 +173,48 @@
     order2.verify(committer).awaitTerminated();
   }
 
+  private static final class NoopManagedBacklogReaderFactory
+      implements ManagedBacklogReaderFactory {
+    @Override
+    public TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) {
+      return null;
+    }
+
+    @Override
+    public void close() {}
+  }
+
   @Test
   @SuppressWarnings("return.type.incompatible")
   public void dofnIsSerializable() throws Exception {
     ObjectOutputStream output = new ObjectOutputStream(new ByteArrayOutputStream());
     output.writeObject(
         new PerSubscriptionPartitionSdf(
-            MAX_SLEEP_TIME, x -> null, (x, y) -> null, (x, y, z) -> null, (x) -> null));
+            MAX_SLEEP_TIME,
+            new NoopManagedBacklogReaderFactory(),
+            x -> null,
+            (x, y) -> null,
+            (x, y, z) -> null,
+            (x) -> null));
+  }
+
+  @Test
+  public void getProgressUnboundedRangeDelegates() {
+    Progress progress = Progress.from(0, 0.2);
+    when(tracker.getProgress()).thenReturn(progress);
+    assertTrue(
+        DoubleMath.fuzzyEquals(
+            progress.getWorkRemaining(), sdf.getSize(PARTITION, RESTRICTION), .0001));
+    verify(tracker).getProgress();
+  }
+
+  @Test
+  public void getProgressBoundedReturnsBytes() {
+    assertTrue(
+        DoubleMath.fuzzyEquals(
+            123.0,
+            sdf.getSize(PARTITION, OffsetByteRange.of(new OffsetRange(87, 8000), 123)),
+            .0001));
+    verifyNoInteractions(tracker);
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
new file mode 100644
index 0000000..e242942
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.junit.Assert.fail;
+
+import com.google.cloud.pubsublite.AdminClient;
+import com.google.cloud.pubsublite.AdminClientSettings;
+import com.google.cloud.pubsublite.BacklogLocation;
+import com.google.cloud.pubsublite.CloudZone;
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.ProjectId;
+import com.google.cloud.pubsublite.SubscriptionName;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicName;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.proto.PubSubMessage;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.cloud.pubsublite.proto.Subscription;
+import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement;
+import com.google.cloud.pubsublite.proto.Topic;
+import com.google.cloud.pubsublite.proto.Topic.PartitionConfig.Capacity;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import com.google.protobuf.ByteString;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.FlatMapElements;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+public class ReadWriteIT {
+  private static final Logger LOG = LoggerFactory.getLogger(ReadWriteIT.class);
+  private static final CloudZone ZONE = CloudZone.parse("us-central1-b");
+  private static final int MESSAGE_COUNT = 90;
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  private static ProjectId getProject(PipelineOptions options) {
+    return ProjectId.of(checkArgumentNotNull(options.as(GcpOptions.class).getProject()));
+  }
+
+  private static String randomName() {
+    return "beam_it_resource_" + ThreadLocalRandom.current().nextLong();
+  }
+
+  private static AdminClient newAdminClient() {
+    return AdminClient.create(AdminClientSettings.newBuilder().setRegion(ZONE.region()).build());
+  }
+
+  private final Deque<Runnable> cleanupActions = new ArrayDeque<>();
+
+  private TopicPath createTopic(ProjectId id) throws Exception {
+    TopicPath toReturn =
+        TopicPath.newBuilder()
+            .setProject(id)
+            .setLocation(ZONE)
+            .setName(TopicName.of(randomName()))
+            .build();
+    Topic.Builder topic = Topic.newBuilder().setName(toReturn.toString());
+    topic
+        .getPartitionConfigBuilder()
+        .setCount(2)
+        .setCapacity(Capacity.newBuilder().setPublishMibPerSec(4).setSubscribeMibPerSec(4));
+    topic.getRetentionConfigBuilder().setPerPartitionBytes(30 * (1L << 30));
+    cleanupActions.addLast(
+        () -> {
+          try (AdminClient client = newAdminClient()) {
+            client.deleteTopic(toReturn).get();
+          } catch (Throwable t) {
+            LOG.error("Failed to clean up topic.", t);
+          }
+        });
+    try (AdminClient client = newAdminClient()) {
+      client.createTopic(topic.build()).get();
+    }
+    return toReturn;
+  }
+
+  private SubscriptionPath createSubscription(TopicPath topic) throws Exception {
+    SubscriptionPath toReturn =
+        SubscriptionPath.newBuilder()
+            .setProject(topic.project())
+            .setLocation(ZONE)
+            .setName(SubscriptionName.of(randomName()))
+            .build();
+    Subscription.Builder subscription = Subscription.newBuilder().setName(toReturn.toString());
+    subscription
+        .getDeliveryConfigBuilder()
+        .setDeliveryRequirement(DeliveryRequirement.DELIVER_IMMEDIATELY);
+    subscription.setTopic(topic.toString());
+    cleanupActions.addLast(
+        () -> {
+          try (AdminClient client = newAdminClient()) {
+            client.deleteSubscription(toReturn).get();
+          } catch (Throwable t) {
+            LOG.error("Failed to clean up subscription.", t);
+          }
+        });
+    try (AdminClient client = newAdminClient()) {
+      client.createSubscription(subscription.build(), BacklogLocation.BEGINNING).get();
+    }
+    return toReturn;
+  }
+
+  @After
+  public void tearDown() {
+    while (!cleanupActions.isEmpty()) {
+      cleanupActions.removeLast().run();
+    }
+  }
+
+  // Workaround for BEAM-12867
+  // TODO(BEAM-12867): Remove this.
+  private static class CustomCreate extends PTransform<PCollection<Void>, PCollection<Integer>> {
+    @Override
+    public PCollection<Integer> expand(PCollection<Void> input) {
+      return input.apply(
+          "createIndexes",
+          FlatMapElements.via(
+              new SimpleFunction<Void, Iterable<Integer>>() {
+                @Override
+                public Iterable<Integer> apply(Void input) {
+                  return IntStream.range(0, MESSAGE_COUNT).boxed().collect(Collectors.toList());
+                }
+              }));
+    }
+  }
+
+  public static void writeMessages(TopicPath topicPath, Pipeline pipeline) {
+    PCollection<Void> trigger = pipeline.apply(Create.of((Void) null));
+    PCollection<Integer> indexes = trigger.apply("createIndexes", new CustomCreate());
+    PCollection<PubSubMessage> messages =
+        indexes.apply(
+            "createMessages",
+            MapElements.via(
+                new SimpleFunction<Integer, PubSubMessage>(
+                    index ->
+                        Message.builder()
+                            .setData(ByteString.copyFromUtf8(index.toString()))
+                            .build()
+                            .toProto()) {}));
+    // Add UUIDs to messages for later deduplication.
+    messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
+    messages.apply(
+        "writeMessages",
+        PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
+  }
+
+  public static PCollection<SequencedMessage> readMessages(
+      SubscriptionPath subscriptionPath, Pipeline pipeline) {
+    PCollection<SequencedMessage> messages =
+        pipeline.apply(
+            "readMessages",
+            PubsubLiteIO.read(
+                SubscriberOptions.newBuilder()
+                    .setSubscriptionPath(subscriptionPath)
+                    // setMinBundleTimeout INTENDED FOR TESTING ONLY
+                    // This sacrifices efficiency to make tests run faster. Do not use this in a
+                    // real pipeline!
+                    .setMinBundleTimeout(Duration.standardSeconds(5))
+                    .build()));
+    // Deduplicate messages based on the uuids added in PubsubLiteIO.addUuids() when writing.
+    return messages.apply(
+        "dedupeMessages", PubsubLiteIO.deduplicate(UuidDeduplicationOptions.newBuilder().build()));
+  }
+
+  // This static out of band communication is needed to retain serializability.
+  @GuardedBy("ReadWriteIT.class")
+  private static final List<SequencedMessage> received = new ArrayList<>();
+
+  private static synchronized void addMessageReceived(SequencedMessage message) {
+    received.add(message);
+  }
+
+  private static synchronized List<SequencedMessage> getTestQuickstartReceived() {
+    return ImmutableList.copyOf(received);
+  }
+
+  private static PTransform<PCollection<? extends SequencedMessage>, PCollection<Void>>
+      collectTestQuickstart() {
+    return MapElements.via(
+        new SimpleFunction<SequencedMessage, Void>() {
+          @Override
+          public Void apply(SequencedMessage input) {
+            addMessageReceived(input);
+            return null;
+          }
+        });
+  }
+
+  @Test
+  public void testReadWrite() throws Exception {
+    pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
+    pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
+
+    TopicPath topic = createTopic(getProject(pipeline.getOptions()));
+    SubscriptionPath subscription = createSubscription(topic);
+
+    // Publish some messages
+    writeMessages(topic, pipeline);
+
+    // Read some messages. They should be deduplicated by the time we see them, so there should be
+    // exactly numMessages, one for every index in [0,MESSAGE_COUNT).
+    PCollection<SequencedMessage> messages = readMessages(subscription, pipeline);
+    messages.apply("messageReceiver", collectTestQuickstart());
+    pipeline.run();
+    LOG.info("Running!");
+    for (int round = 0; round < 120; ++round) {
+      Thread.sleep(1000);
+      Map<Integer, Integer> receivedCounts = new HashMap<>();
+      for (SequencedMessage message : getTestQuickstartReceived()) {
+        int id = Integer.parseInt(message.getMessage().getData().toStringUtf8());
+        receivedCounts.put(id, receivedCounts.getOrDefault(id, 0) + 1);
+      }
+      LOG.info("Performing comparison round {}.\n", round);
+      boolean done = true;
+      List<Integer> missing = new ArrayList<>();
+      for (int id = 0; id < MESSAGE_COUNT; id++) {
+        int idCount = receivedCounts.getOrDefault(id, 0);
+        if (idCount == 0) {
+          missing.add(id);
+          done = false;
+        }
+        if (idCount > 1) {
+          fail(String.format("Failed to deduplicate message with id %s.", id));
+        }
+      }
+      LOG.info("Still messing messages: {}.\n", missing);
+      if (done) {
+        return;
+      }
+    }
+    fail(
+        String.format(
+            "Failed to receive all messages after 2 minutes. Received %s messages.",
+            getTestQuickstartReceived().size()));
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java
index dbf3b93..3d743758 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java
@@ -31,7 +31,6 @@
 import static org.mockito.Mockito.when;
 import static org.mockito.MockitoAnnotations.initMocks;
 
-import com.google.api.core.ApiFutures;
 import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.StatusCode.Code;
 import com.google.cloud.pubsublite.Offset;
@@ -40,7 +39,6 @@
 import com.google.cloud.pubsublite.internal.wire.Subscriber;
 import com.google.cloud.pubsublite.proto.Cursor;
 import com.google.cloud.pubsublite.proto.FlowControlRequest;
-import com.google.cloud.pubsublite.proto.SeekRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
 import java.util.List;
@@ -64,7 +62,7 @@
 @RunWith(JUnit4.class)
 @SuppressWarnings("initialization.fields.uninitialized")
 public class SubscriptionPartitionProcessorImplTest {
-  @Spy RestrictionTracker<OffsetRange, OffsetByteProgress> tracker;
+  @Spy RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
   @Mock OutputReceiver<SequencedMessage> receiver;
   @Mock Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory;
 
@@ -83,6 +81,10 @@
         .build();
   }
 
+  private OffsetByteRange initialRange() {
+    return OffsetByteRange.of(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+  }
+
   @Before
   public void setUp() {
     initMocks(this);
@@ -100,18 +102,11 @@
 
   @Test
   public void lifecycle() throws Exception {
-    when(tracker.currentRestriction())
-        .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
-    when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
+    when(tracker.currentRestriction()).thenReturn(initialRange());
     processor.start();
     verify(subscriber).startAsync();
     verify(subscriber).awaitRunning();
     verify(subscriber)
-        .seek(
-            SeekRequest.newBuilder()
-                .setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value()))
-                .build());
-    verify(subscriber)
         .allowFlow(
             FlowControlRequest.newBuilder()
                 .setAllowedBytes(DEFAULT_FLOW_CONTROL.bytesOutstanding())
@@ -123,29 +118,15 @@
   }
 
   @Test
-  public void lifecycleSeekThrows() throws Exception {
-    when(tracker.currentRestriction())
-        .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
-    when(subscriber.seek(any()))
-        .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE)));
+  public void lifecycleFlowControlThrows() throws Exception {
+    when(tracker.currentRestriction()).thenReturn(initialRange());
     doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any());
     assertThrows(CheckedApiException.class, () -> processor.start());
   }
 
   @Test
-  public void lifecycleFlowControlThrows() {
-    when(tracker.currentRestriction())
-        .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
-    when(subscriber.seek(any()))
-        .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE)));
-    assertThrows(CheckedApiException.class, () -> processor.start());
-  }
-
-  @Test
   public void lifecycleSubscriberAwaitThrows() throws Exception {
-    when(tracker.currentRestriction())
-        .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
-    when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
+    when(tracker.currentRestriction()).thenReturn(initialRange());
     processor.start();
     doThrow(new CheckedApiException(Code.INTERNAL).underlying).when(subscriber).awaitTerminated();
     assertThrows(ApiException.class, () -> processor.close());
@@ -155,21 +136,19 @@
 
   @Test
   public void subscriberFailureFails() throws Exception {
-    when(tracker.currentRestriction())
-        .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
-    when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
+    when(tracker.currentRestriction()).thenReturn(initialRange());
     processor.start();
     subscriber.fail(new CheckedApiException(Code.OUT_OF_RANGE));
     ApiException e =
-        assertThrows(ApiException.class, () -> processor.waitForCompletion(Duration.ZERO));
+        assertThrows(
+            // Longer wait is needed due to listener asynchrony.
+            ApiException.class, () -> processor.waitForCompletion(Duration.standardSeconds(1)));
     assertEquals(Code.OUT_OF_RANGE, e.getStatusCode().getCode());
   }
 
   @Test
   public void allowFlowFailureFails() throws Exception {
-    when(tracker.currentRestriction())
-        .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
-    when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
+    when(tracker.currentRestriction()).thenReturn(initialRange());
     processor.start();
     when(tracker.tryClaim(any())).thenReturn(true);
     doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any());