Merge pull request #15629 from apache/revert-15608-dpcollins-cherry-pick

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 480a2d2..59ca672 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -446,7 +446,7 @@
     def errorprone_version = "2.3.4"
     def google_clients_version = "1.31.0"
     def google_cloud_bigdataoss_version = "2.2.2"
-    def google_cloud_pubsublite_version = "1.0.4"
+    def google_cloud_pubsublite_version = "0.13.2"
     def google_code_gson_version = "2.8.6"
     def google_oauth_clients_version = "1.31.0"
     // Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java
index bf6a288..b0cc681 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java
@@ -24,36 +24,25 @@
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
 /** Common util functions for converting between PubsubMessage proto and {@link PubsubMessage}. */
-public final class PubsubMessages {
-  private PubsubMessages() {}
-
-  public static com.google.pubsub.v1.PubsubMessage toProto(PubsubMessage input) {
-    Map<String, String> attributes = input.getAttributeMap();
-    com.google.pubsub.v1.PubsubMessage.Builder message =
-        com.google.pubsub.v1.PubsubMessage.newBuilder()
-            .setData(ByteString.copyFrom(input.getPayload()));
-    // TODO(BEAM-8085) this should not be null
-    if (attributes != null) {
-      message.putAllAttributes(attributes);
-    }
-    String messageId = input.getMessageId();
-    if (messageId != null) {
-      message.setMessageId(messageId);
-    }
-    return message.build();
-  }
-
-  public static PubsubMessage fromProto(com.google.pubsub.v1.PubsubMessage input) {
-    return new PubsubMessage(
-        input.getData().toByteArray(), input.getAttributesMap(), input.getMessageId());
-  }
-
+public class PubsubMessages {
   // Convert the PubsubMessage to a PubsubMessage proto, then return its serialized representation.
   public static class ParsePayloadAsPubsubMessageProto
       implements SerializableFunction<PubsubMessage, byte[]> {
     @Override
     public byte[] apply(PubsubMessage input) {
-      return toProto(input).toByteArray();
+      Map<String, String> attributes = input.getAttributeMap();
+      com.google.pubsub.v1.PubsubMessage.Builder message =
+          com.google.pubsub.v1.PubsubMessage.newBuilder()
+              .setData(ByteString.copyFrom(input.getPayload()));
+      // TODO(BEAM-8085) this should not be null
+      if (attributes != null) {
+        message.putAllAttributes(attributes);
+      }
+      String messageId = input.getMessageId();
+      if (messageId != null) {
+        message.setMessageId(messageId);
+      }
+      return message.build().toByteArray();
     }
   }
 
@@ -65,7 +54,8 @@
       try {
         com.google.pubsub.v1.PubsubMessage message =
             com.google.pubsub.v1.PubsubMessage.parseFrom(input);
-        return fromProto(message);
+        return new PubsubMessage(
+            message.getData().toByteArray(), message.getAttributesMap(), message.getMessageId());
       } catch (InvalidProtocolBufferException e) {
         throw new RuntimeException("Could not decode Pubsub message", e);
       }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java
new file mode 100644
index 0000000..6dc1516
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.proto.PubSubMessage;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A class providing a conversion validity check between Cloud Pub/Sub and Pub/Sub Lite message
+ * types.
+ */
+public final class CloudPubsubChecks {
+  private CloudPubsubChecks() {}
+
+  /**
+   * Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the
+   * standard transformation methods in the client library.
+   *
+   * <p>Will fail the pipeline if a message has multiple attributes per key.
+   */
+  public static PTransform<PCollection<? extends PubSubMessage>, PCollection<PubSubMessage>>
+      ensureUsableAsCloudPubsub() {
+    return MapElements.into(TypeDescriptor.of(PubSubMessage.class))
+        .via(
+            message -> {
+              Object unused = toCpsPublishTransformer().transform(Message.fromProto(message));
+              return message;
+            });
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
deleted file mode 100644
index 1140c11..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer;
-import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
-import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer;
-
-import com.google.cloud.pubsublite.Message;
-import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.cloud.pubsublite.proto.SequencedMessage;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/** A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message types. */
-public final class CloudPubsubTransforms {
-  private CloudPubsubTransforms() {}
-  /**
-   * Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the
-   * standard transformation methods in the client library.
-   *
-   * <p>Will fail the pipeline if a message has multiple attributes per key.
-   */
-  public static PTransform<PCollection<PubSubMessage>, PCollection<PubSubMessage>>
-      ensureUsableAsCloudPubsub() {
-    return new PTransform<PCollection<PubSubMessage>, PCollection<PubSubMessage>>() {
-      @Override
-      public PCollection<PubSubMessage> expand(PCollection<PubSubMessage> input) {
-        return input.apply(
-            MapElements.into(TypeDescriptor.of(PubSubMessage.class))
-                .via(
-                    message -> {
-                      Object unused =
-                          toCpsPublishTransformer().transform(Message.fromProto(message));
-                      return message;
-                    }));
-      }
-    };
-  }
-
-  /**
-   * Transform messages read from Pub/Sub Lite to their equivalent Cloud Pub/Sub Message that would
-   * have been read from PubsubIO.
-   *
-   * <p>Will fail the pipeline if a message has multiple attributes per map key.
-   */
-  public static PTransform<PCollection<SequencedMessage>, PCollection<PubsubMessage>>
-      toCloudPubsubMessages() {
-    return new PTransform<PCollection<SequencedMessage>, PCollection<PubsubMessage>>() {
-      @Override
-      public PCollection<PubsubMessage> expand(PCollection<SequencedMessage> input) {
-        return input.apply(
-            MapElements.into(TypeDescriptor.of(PubsubMessage.class))
-                .via(
-                    message ->
-                        PubsubMessages.fromProto(
-                            toCpsSubscribeTransformer()
-                                .transform(
-                                    com.google.cloud.pubsublite.SequencedMessage.fromProto(
-                                        message)))));
-      }
-    };
-  }
-
-  /**
-   * Transform messages publishable using PubsubIO to their equivalent Pub/Sub Lite publishable
-   * message.
-   */
-  public static PTransform<PCollection<PubsubMessage>, PCollection<PubSubMessage>>
-      fromCloudPubsubMessages() {
-    return new PTransform<PCollection<PubsubMessage>, PCollection<PubSubMessage>>() {
-      @Override
-      public PCollection<PubSubMessage> expand(PCollection<PubsubMessage> input) {
-        return input.apply(
-            MapElements.into(TypeDescriptor.of(PubSubMessage.class))
-                .via(
-                    message ->
-                        fromCpsPublishTransformer(KeyExtractor.DEFAULT)
-                            .transform(PubsubMessages.toProto(message))
-                            .toProto()));
-      }
-    };
-  }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java
deleted file mode 100644
index de0cf43..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import java.io.Serializable;
-
-/**
- * A ManagedBacklogReaderFactory produces TopicBacklogReaders and tears down any produced readers
- * when it is itself closed.
- *
- * <p>close() should never be called on produced readers.
- */
-public interface ManagedBacklogReaderFactory extends AutoCloseable, Serializable {
-  TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition);
-
-  @Override
-  void close();
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java
deleted file mode 100644
index 9a337bf..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import com.google.api.gax.rpc.ApiException;
-import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-
-public class ManagedBacklogReaderFactoryImpl implements ManagedBacklogReaderFactory {
-  private final SerializableFunction<SubscriptionPartition, TopicBacklogReader> newReader;
-
-  @GuardedBy("this")
-  private final Map<SubscriptionPartition, TopicBacklogReader> readers = new HashMap<>();
-
-  ManagedBacklogReaderFactoryImpl(
-      SerializableFunction<SubscriptionPartition, TopicBacklogReader> newReader) {
-    this.newReader = newReader;
-  }
-
-  private static final class NonCloseableTopicBacklogReader implements TopicBacklogReader {
-    private final TopicBacklogReader underlying;
-
-    NonCloseableTopicBacklogReader(TopicBacklogReader underlying) {
-      this.underlying = underlying;
-    }
-
-    @Override
-    public ComputeMessageStatsResponse computeMessageStats(Offset offset) throws ApiException {
-      return underlying.computeMessageStats(offset);
-    }
-
-    @Override
-    public void close() {
-      throw new IllegalArgumentException(
-          "Cannot call close() on a reader returned from ManagedBacklogReaderFactory.");
-    }
-  }
-
-  @Override
-  public synchronized TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) {
-    return new NonCloseableTopicBacklogReader(
-        readers.computeIfAbsent(subscriptionPartition, newReader::apply));
-  }
-
-  @Override
-  public synchronized void close() {
-    readers.values().forEach(TopicBacklogReader::close);
-  }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java
deleted file mode 100644
index b39d87e..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.range.OffsetRange;
-
-@AutoValue
-@DefaultCoder(OffsetByteRangeCoder.class)
-abstract class OffsetByteRange {
-  abstract OffsetRange getRange();
-
-  abstract long getByteCount();
-
-  static OffsetByteRange of(OffsetRange range, long byteCount) {
-    return new AutoValue_OffsetByteRange(range, byteCount);
-  }
-
-  static OffsetByteRange of(OffsetRange range) {
-    return of(range, 0);
-  }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java
deleted file mode 100644
index 076cda1..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.coders.CoderProviders;
-import org.apache.beam.sdk.coders.DelegateCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.range.OffsetRange;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-public class OffsetByteRangeCoder extends AtomicCoder<OffsetByteRange> {
-  private static final Coder<OffsetByteRange> CODER =
-      DelegateCoder.of(
-          KvCoder.of(OffsetRange.Coder.of(), VarLongCoder.of()),
-          OffsetByteRangeCoder::toKv,
-          OffsetByteRangeCoder::fromKv);
-
-  private static KV<OffsetRange, Long> toKv(OffsetByteRange value) {
-    return KV.of(value.getRange(), value.getByteCount());
-  }
-
-  private static OffsetByteRange fromKv(KV<OffsetRange, Long> kv) {
-    return OffsetByteRange.of(kv.getKey(), kv.getValue());
-  }
-
-  @Override
-  public void encode(OffsetByteRange value, OutputStream outStream) throws IOException {
-    CODER.encode(value, outStream);
-  }
-
-  @Override
-  public OffsetByteRange decode(InputStream inStream) throws IOException {
-    return CODER.decode(inStream);
-  }
-
-  public static CoderProvider getCoderProvider() {
-    return CoderProviders.forCoder(
-        TypeDescriptor.of(OffsetByteRange.class), new OffsetByteRangeCoder());
-  }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java
index da9aaaa..608af8f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java
@@ -26,6 +26,8 @@
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
 import org.joda.time.Duration;
@@ -42,27 +44,24 @@
  * received. IMPORTANT: minTrackingTime must be strictly smaller than the SDF read timeout when it
  * would return ProcessContinuation.resume().
  */
-class OffsetByteRangeTracker extends TrackerWithProgress {
-  private final TopicBacklogReader unownedBacklogReader;
+class OffsetByteRangeTracker extends RestrictionTracker<OffsetRange, OffsetByteProgress>
+    implements HasProgress {
+  private final TopicBacklogReader backlogReader;
   private final Duration minTrackingTime;
   private final long minBytesReceived;
   private final Stopwatch stopwatch;
-  private OffsetByteRange range;
+  private OffsetRange range;
   private @Nullable Long lastClaimed;
+  private long byteCount = 0;
 
   public OffsetByteRangeTracker(
-      OffsetByteRange range,
-      TopicBacklogReader unownedBacklogReader,
+      OffsetRange range,
+      TopicBacklogReader backlogReader,
       Stopwatch stopwatch,
       Duration minTrackingTime,
       long minBytesReceived) {
-    checkArgument(
-        range.getRange().getTo() == Long.MAX_VALUE,
-        "May only construct OffsetByteRangeTracker with an unbounded range with no progress.");
-    checkArgument(
-        range.getByteCount() == 0L,
-        "May only construct OffsetByteRangeTracker with an unbounded range with no progress.");
-    this.unownedBacklogReader = unownedBacklogReader;
+    checkArgument(range.getTo() == Long.MAX_VALUE);
+    this.backlogReader = backlogReader;
     this.minTrackingTime = minTrackingTime;
     this.minBytesReceived = minBytesReceived;
     this.stopwatch = stopwatch.reset().start();
@@ -70,6 +69,11 @@
   }
 
   @Override
+  public void finalize() {
+    this.backlogReader.close();
+  }
+
+  @Override
   public IsBounded isBounded() {
     return IsBounded.UNBOUNDED;
   }
@@ -83,32 +87,32 @@
         position.lastOffset().value(),
         lastClaimed);
     checkArgument(
-        toClaim >= range.getRange().getFrom(),
+        toClaim >= range.getFrom(),
         "Trying to claim offset %s before start of the range %s",
         toClaim,
         range);
     // split() has already been called, truncating this range. No more offsets may be claimed.
-    if (range.getRange().getTo() != Long.MAX_VALUE) {
-      boolean isRangeEmpty = range.getRange().getTo() == range.getRange().getFrom();
-      boolean isValidClosedRange = nextOffset() == range.getRange().getTo();
+    if (range.getTo() != Long.MAX_VALUE) {
+      boolean isRangeEmpty = range.getTo() == range.getFrom();
+      boolean isValidClosedRange = nextOffset() == range.getTo();
       checkState(
           isRangeEmpty || isValidClosedRange,
           "Violated class precondition: offset range improperly split. Please report a beam bug.");
       return false;
     }
     lastClaimed = toClaim;
-    range = OffsetByteRange.of(range.getRange(), range.getByteCount() + position.batchBytes());
+    byteCount += position.batchBytes();
     return true;
   }
 
   @Override
-  public OffsetByteRange currentRestriction() {
+  public OffsetRange currentRestriction() {
     return range;
   }
 
   private long nextOffset() {
     checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE);
-    return lastClaimed == null ? currentRestriction().getRange().getFrom() : lastClaimed + 1;
+    return lastClaimed == null ? currentRestriction().getFrom() : lastClaimed + 1;
   }
 
   /**
@@ -120,33 +124,29 @@
     if (duration.isLongerThan(minTrackingTime)) {
       return true;
     }
-    if (currentRestriction().getByteCount() >= minBytesReceived) {
+    if (byteCount >= minBytesReceived) {
       return true;
     }
     return false;
   }
 
   @Override
-  public @Nullable SplitResult<OffsetByteRange> trySplit(double fractionOfRemainder) {
+  public @Nullable SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
     // Cannot split a bounded range. This should already be completely claimed.
-    if (range.getRange().getTo() != Long.MAX_VALUE) {
+    if (range.getTo() != Long.MAX_VALUE) {
       return null;
     }
     if (!receivedEnough()) {
       return null;
     }
-    range =
-        OffsetByteRange.of(
-            new OffsetRange(currentRestriction().getRange().getFrom(), nextOffset()),
-            range.getByteCount());
-    return SplitResult.of(
-        this.range, OffsetByteRange.of(new OffsetRange(nextOffset(), Long.MAX_VALUE), 0));
+    range = new OffsetRange(currentRestriction().getFrom(), nextOffset());
+    return SplitResult.of(this.range, new OffsetRange(nextOffset(), Long.MAX_VALUE));
   }
 
   @Override
   @SuppressWarnings("unboxing.of.nullable")
   public void checkDone() throws IllegalStateException {
-    if (range.getRange().getFrom() == range.getRange().getTo()) {
+    if (range.getFrom() == range.getTo()) {
       return;
     }
     checkState(
@@ -155,18 +155,18 @@
         range);
     long lastClaimedNotNull = checkNotNull(lastClaimed);
     checkState(
-        lastClaimedNotNull >= range.getRange().getTo() - 1,
+        lastClaimedNotNull >= range.getTo() - 1,
         "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",
         lastClaimedNotNull,
         range,
         lastClaimedNotNull + 1,
-        range.getRange().getTo());
+        range.getTo());
   }
 
   @Override
   public Progress getProgress() {
     ComputeMessageStatsResponse stats =
-        this.unownedBacklogReader.computeMessageStats(Offset.of(nextOffset()));
-    return Progress.from(range.getByteCount(), stats.getMessageBytes());
+        this.backlogReader.computeMessageStats(Offset.of(nextOffset()));
+    return Progress.from(byteCount, stats.getMessageBytes());
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java
index d7526d8..623e20c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java
@@ -27,8 +27,4 @@
   private PerServerPublisherCache() {}
 
   static final PublisherCache PUBLISHER_CACHE = new PublisherCache();
-
-  static {
-    Runtime.getRuntime().addShutdownHook(new Thread(PUBLISHER_CACHE::close));
-  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
index fdf7920..a9f7a43 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
@@ -17,12 +17,13 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite;
 
-import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
 import com.google.cloud.pubsublite.internal.wire.Committer;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
+import java.util.concurrent.ExecutionException;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableBiFunction;
@@ -34,35 +35,31 @@
 
 class PerSubscriptionPartitionSdf extends DoFn<SubscriptionPartition, SequencedMessage> {
   private final Duration maxSleepTime;
-  private final ManagedBacklogReaderFactory backlogReaderFactory;
   private final SubscriptionPartitionProcessorFactory processorFactory;
   private final SerializableFunction<SubscriptionPartition, InitialOffsetReader>
       offsetReaderFactory;
-  private final SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress>
+  private final SerializableBiFunction<
+          SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>>
       trackerFactory;
   private final SerializableFunction<SubscriptionPartition, Committer> committerFactory;
 
   PerSubscriptionPartitionSdf(
       Duration maxSleepTime,
-      ManagedBacklogReaderFactory backlogReaderFactory,
       SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory,
-      SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress>
+      SerializableBiFunction<
+              SubscriptionPartition,
+              OffsetRange,
+              RestrictionTracker<OffsetRange, OffsetByteProgress>>
           trackerFactory,
       SubscriptionPartitionProcessorFactory processorFactory,
       SerializableFunction<SubscriptionPartition, Committer> committerFactory) {
     this.maxSleepTime = maxSleepTime;
-    this.backlogReaderFactory = backlogReaderFactory;
     this.processorFactory = processorFactory;
     this.offsetReaderFactory = offsetReaderFactory;
     this.trackerFactory = trackerFactory;
     this.committerFactory = committerFactory;
   }
 
-  @Teardown
-  public void teardown() {
-    backlogReaderFactory.close();
-  }
-
   @GetInitialWatermarkEstimatorState
   public Instant getInitialWatermarkState() {
     return Instant.EPOCH;
@@ -75,7 +72,7 @@
 
   @ProcessElement
   public ProcessContinuation processElement(
-      RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
+      RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
       @Element SubscriptionPartition subscriptionPartition,
       OutputReceiver<SequencedMessage> receiver)
       throws Exception {
@@ -86,44 +83,38 @@
       processor
           .lastClaimed()
           .ifPresent(
-              lastClaimedOffset -> {
+              lastClaimedOffset ->
+              /* TODO(boyuanzz): When default dataflow can use finalizers, undo this.
+              finalizer.afterBundleCommit(
+                  Instant.ofEpochMilli(Long.MAX_VALUE),
+                  () -> */ {
                 Committer committer = committerFactory.apply(subscriptionPartition);
                 committer.startAsync().awaitRunning();
                 // Commit the next-to-deliver offset.
                 try {
                   committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
+                } catch (ExecutionException e) {
+                  throw toCanonical(checkArgumentNotNull(e.getCause())).underlying;
                 } catch (Exception e) {
-                  throw ExtractStatus.toCanonical(e).underlying;
+                  throw toCanonical(e).underlying;
                 }
-                blockingShutdown(committer);
+                committer.stopAsync().awaitTerminated();
               });
       return result;
     }
   }
 
   @GetInitialRestriction
-  public OffsetByteRange getInitialRestriction(
-      @Element SubscriptionPartition subscriptionPartition) {
+  public OffsetRange getInitialRestriction(@Element SubscriptionPartition subscriptionPartition) {
     try (InitialOffsetReader reader = offsetReaderFactory.apply(subscriptionPartition)) {
       Offset offset = reader.read();
-      return OffsetByteRange.of(
-          new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */));
+      return new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */);
     }
   }
 
   @NewTracker
-  public TrackerWithProgress newTracker(
-      @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetByteRange range) {
-    return trackerFactory.apply(backlogReaderFactory.newReader(subscriptionPartition), range);
-  }
-
-  @GetSize
-  public double getSize(
-      @Element SubscriptionPartition subscriptionPartition,
-      @Restriction OffsetByteRange restriction) {
-    if (restriction.getRange().getTo() != Long.MAX_VALUE) {
-      return restriction.getByteCount();
-    }
-    return newTracker(subscriptionPartition, restriction).getProgress().getWorkRemaining();
+  public RestrictionTracker<OffsetRange, OffsetByteProgress> newTracker(
+      @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetRange range) {
+    return trackerFactory.apply(subscriptionPartition, range);
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java
index 3dbdec6..f8dc24b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java
@@ -23,50 +23,52 @@
 import com.google.api.core.ApiService.State;
 import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.MessageMetadata;
+import com.google.cloud.pubsublite.internal.CloseableMonitor;
 import com.google.cloud.pubsublite.internal.Publisher;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 
 /** A map of working publishers by PublisherOptions. */
-class PublisherCache implements AutoCloseable {
-  @GuardedBy("this")
+class PublisherCache {
+  private final CloseableMonitor monitor = new CloseableMonitor();
+
+  private final Executor listenerExecutor = Executors.newSingleThreadExecutor();
+
+  @GuardedBy("monitor.monitor")
   private final HashMap<PublisherOptions, Publisher<MessageMetadata>> livePublishers =
       new HashMap<>();
 
-  private synchronized void evict(PublisherOptions options) {
-    livePublishers.remove(options);
-  }
-
-  synchronized Publisher<MessageMetadata> get(PublisherOptions options) throws ApiException {
+  Publisher<MessageMetadata> get(PublisherOptions options) throws ApiException {
     checkArgument(options.usesCache());
-    Publisher<MessageMetadata> publisher = livePublishers.get(options);
-    if (publisher != null) {
+    try (CloseableMonitor.Hold h = monitor.enter()) {
+      Publisher<MessageMetadata> publisher = livePublishers.get(options);
+      if (publisher != null) {
+        return publisher;
+      }
+      publisher = Publishers.newPublisher(options);
+      livePublishers.put(options, publisher);
+      publisher.addListener(
+          new Listener() {
+            @Override
+            public void failed(State s, Throwable t) {
+              try (CloseableMonitor.Hold h = monitor.enter()) {
+                livePublishers.remove(options);
+              }
+            }
+          },
+          listenerExecutor);
+      publisher.startAsync().awaitRunning();
       return publisher;
     }
-    publisher = Publishers.newPublisher(options);
-    livePublishers.put(options, publisher);
-    publisher.addListener(
-        new Listener() {
-          @Override
-          public void failed(State s, Throwable t) {
-            evict(options);
-          }
-        },
-        SystemExecutors.getFuturesExecutor());
-    publisher.startAsync().awaitRunning();
-    return publisher;
   }
 
   @VisibleForTesting
-  synchronized void set(PublisherOptions options, Publisher<MessageMetadata> toCache) {
-    livePublishers.put(options, toCache);
-  }
-
-  @Override
-  public synchronized void close() {
-    livePublishers.forEach(((options, publisher) -> publisher.stopAsync()));
-    livePublishers.clear();
+  void set(PublisherOptions options, Publisher<MessageMetadata> toCache) {
+    try (CloseableMonitor.Hold h = monitor.enter()) {
+      livePublishers.put(options, toCache);
+    }
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java
index 67ea6cf..34012f7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java
@@ -17,27 +17,17 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite;
 
-import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
 import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument;
-import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
-import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
 
 import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.AdminClient;
 import com.google.cloud.pubsublite.AdminClientSettings;
 import com.google.cloud.pubsublite.MessageMetadata;
-import com.google.cloud.pubsublite.Partition;
-import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
+import com.google.cloud.pubsublite.TopicPath;
 import com.google.cloud.pubsublite.internal.Publisher;
 import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
-import com.google.cloud.pubsublite.internal.wire.PubsubContext;
 import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
-import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
 import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
-import com.google.cloud.pubsublite.v1.AdminServiceClient;
-import com.google.cloud.pubsublite.v1.AdminServiceSettings;
-import com.google.cloud.pubsublite.v1.PublisherServiceClient;
-import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken;
 
 class Publishers {
@@ -45,38 +35,6 @@
 
   private Publishers() {}
 
-  private static AdminClient newAdminClient(PublisherOptions options) throws ApiException {
-    try {
-      return AdminClient.create(
-          AdminClientSettings.newBuilder()
-              .setServiceClient(
-                  AdminServiceClient.create(
-                      addDefaultSettings(
-                          options.topicPath().location().extractRegion(),
-                          AdminServiceSettings.newBuilder())))
-              .setRegion(options.topicPath().location().extractRegion())
-              .build());
-    } catch (Throwable t) {
-      throw toCanonical(t).underlying;
-    }
-  }
-
-  private static PublisherServiceClient newServiceClient(
-      PublisherOptions options, Partition partition) {
-    PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
-    settingsBuilder =
-        addDefaultMetadata(
-            PubsubContext.of(FRAMEWORK),
-            RoutingMetadata.of(options.topicPath(), partition),
-            settingsBuilder);
-    try {
-      return PublisherServiceClient.create(
-          addDefaultSettings(options.topicPath().location().extractRegion(), settingsBuilder));
-    } catch (Throwable t) {
-      throw toCanonical(t).underlying;
-    }
-  }
-
   @SuppressWarnings("unchecked")
   static Publisher<MessageMetadata> newPublisher(PublisherOptions options) throws ApiException {
     SerializableSupplier<Object> supplier = options.publisherSupplier();
@@ -86,18 +44,20 @@
       checkArgument(token.isSupertypeOf(supplied.getClass()));
       return (Publisher<MessageMetadata>) supplied;
     }
-    return PartitionCountWatchingPublisherSettings.newBuilder()
-        .setTopic(options.topicPath())
-        .setPublisherFactory(
-            partition ->
-                SinglePartitionPublisherBuilder.newBuilder()
-                    .setTopic(options.topicPath())
-                    .setPartition(partition)
-                    .setServiceClient(newServiceClient(options, partition))
-                    .setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS)
-                    .build())
-        .setAdminClient(newAdminClient(options))
-        .build()
-        .instantiate();
+
+    TopicPath topic = options.topicPath();
+    PartitionCountWatchingPublisherSettings.Builder publisherSettings =
+        PartitionCountWatchingPublisherSettings.newBuilder()
+            .setTopic(topic)
+            .setPublisherFactory(
+                partition ->
+                    SinglePartitionPublisherBuilder.newBuilder()
+                        .setTopic(topic)
+                        .setPartition(partition)
+                        .build())
+            .setAdminClient(
+                AdminClient.create(
+                    AdminClientSettings.newBuilder().setRegion(topic.location().region()).build()));
+    return publisherSettings.build().instantiate();
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
index b93ac61..ca1f2be 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
@@ -107,7 +107,7 @@
    * }</pre>
    */
   public static PTransform<PCollection<PubSubMessage>, PDone> write(PublisherOptions options) {
-    return new PTransform<PCollection<PubSubMessage>, PDone>() {
+    return new PTransform<PCollection<PubSubMessage>, PDone>("PubsubLiteIO") {
       @Override
       public PDone expand(PCollection<PubSubMessage> input) {
         PubsubLiteSink sink = new PubsubLiteSink(options);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java
index d0e3afa..d3acdfa 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java
@@ -28,14 +28,16 @@
 import com.google.cloud.pubsublite.internal.CheckedApiException;
 import com.google.cloud.pubsublite.internal.ExtractStatus;
 import com.google.cloud.pubsublite.internal.Publisher;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
 import com.google.cloud.pubsublite.proto.PubSubMessage;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import java.util.ArrayDeque;
 import java.util.Deque;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOrError.Kind;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
 
 /** A sink which publishes messages to Pub/Sub Lite. */
 @SuppressWarnings({
@@ -54,6 +56,8 @@
   @GuardedBy("this")
   private transient Deque<CheckedApiException> errorsSinceLastFinish;
 
+  private static final Executor executor = Executors.newCachedThreadPool();
+
   PubsubLiteSink(PublisherOptions options) {
     this.options = options;
   }
@@ -85,7 +89,7 @@
             onFailure.accept(t);
           }
         },
-        SystemExecutors.getFuturesExecutor());
+        MoreExecutors.directExecutor());
     if (!options.usesCache()) {
       publisher.startAsync();
     }
@@ -126,7 +130,7 @@
             onFailure.accept(t);
           }
         },
-        SystemExecutors.getFuturesExecutor());
+        executor);
   }
 
   // Intentionally don't flush on bundle finish to allow multi-sink client reuse.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java
index b6a9f5d..9875880 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java
@@ -23,7 +23,6 @@
 import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.AdminClient;
 import com.google.cloud.pubsublite.AdminClientSettings;
-import com.google.cloud.pubsublite.Offset;
 import com.google.cloud.pubsublite.Partition;
 import com.google.cloud.pubsublite.TopicPath;
 import com.google.cloud.pubsublite.internal.wire.Committer;
@@ -32,6 +31,8 @@
 import java.util.List;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -53,11 +54,10 @@
     checkArgument(subscriptionPartition.subscription().equals(options.subscriptionPath()));
   }
 
-  private Subscriber newSubscriber(
-      Partition partition, Offset initialOffset, Consumer<List<SequencedMessage>> consumer) {
+  private Subscriber newSubscriber(Partition partition, Consumer<List<SequencedMessage>> consumer) {
     try {
       return options
-          .getSubscriberFactory(partition, initialOffset)
+          .getSubscriberFactory(partition)
           .newSubscriber(
               messages ->
                   consumer.accept(
@@ -71,31 +71,23 @@
 
   private SubscriptionPartitionProcessor newPartitionProcessor(
       SubscriptionPartition subscriptionPartition,
-      RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
+      RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver)
       throws ApiException {
     checkSubscription(subscriptionPartition);
     return new SubscriptionPartitionProcessorImpl(
         tracker,
         receiver,
-        consumer ->
-            newSubscriber(
-                subscriptionPartition.partition(),
-                Offset.of(tracker.currentRestriction().getRange().getFrom()),
-                consumer),
+        consumer -> newSubscriber(subscriptionPartition.partition(), consumer),
         options.flowControlSettings());
   }
 
-  private TopicBacklogReader newBacklogReader(SubscriptionPartition subscriptionPartition) {
+  private RestrictionTracker<OffsetRange, OffsetByteProgress> newRestrictionTracker(
+      SubscriptionPartition subscriptionPartition, OffsetRange initial) {
     checkSubscription(subscriptionPartition);
-    return options.getBacklogReader(subscriptionPartition.partition());
-  }
-
-  private TrackerWithProgress newRestrictionTracker(
-      TopicBacklogReader backlogReader, OffsetByteRange initial) {
     return new OffsetByteRangeTracker(
         initial,
-        backlogReader,
+        options.getBacklogReader(subscriptionPartition.partition()),
         Stopwatch.createUnstarted(),
         options.minBundleTimeout(),
         LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10));
@@ -115,7 +107,7 @@
     try (AdminClient admin =
         AdminClient.create(
             AdminClientSettings.newBuilder()
-                .setRegion(options.subscriptionPath().location().extractRegion())
+                .setRegion(options.subscriptionPath().location().region())
                 .build())) {
       return TopicPath.parse(admin.getSubscription(options.subscriptionPath()).get().getTopic());
     } catch (Throwable t) {
@@ -126,15 +118,25 @@
   @Override
   public PCollection<SequencedMessage> expand(PBegin input) {
     PCollection<SubscriptionPartition> subscriptionPartitions;
-    subscriptionPartitions =
-        input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath()));
+    if (options.partitions().isEmpty()) {
+      subscriptionPartitions =
+          input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath()));
+    } else {
+      subscriptionPartitions =
+          input.apply(
+              Create.of(
+                  options.partitions().stream()
+                      .map(
+                          partition ->
+                              SubscriptionPartition.of(options.subscriptionPath(), partition))
+                      .collect(Collectors.toList())));
+    }
 
     return subscriptionPartitions.apply(
         ParDo.of(
             new PerSubscriptionPartitionSdf(
                 // Ensure we read for at least 5 seconds more than the bundle timeout.
                 options.minBundleTimeout().plus(Duration.standardSeconds(5)),
-                new ManagedBacklogReaderFactoryImpl(this::newBacklogReader),
                 this::newInitialOffsetReader,
                 this::newRestrictionTracker,
                 this::newPartitionProcessor,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
index a9625be..0d3afe2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
@@ -23,7 +23,6 @@
 
 import com.google.api.gax.rpc.ApiException;
 import com.google.auto.value.AutoValue;
-import com.google.cloud.pubsublite.Offset;
 import com.google.cloud.pubsublite.Partition;
 import com.google.cloud.pubsublite.SubscriptionPath;
 import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
@@ -36,13 +35,13 @@
 import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
 import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
 import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
-import com.google.cloud.pubsublite.proto.Cursor;
-import com.google.cloud.pubsublite.proto.SeekRequest;
 import com.google.cloud.pubsublite.v1.CursorServiceClient;
 import com.google.cloud.pubsublite.v1.CursorServiceSettings;
 import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
 import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
 import java.io.Serializable;
+import java.util.Set;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 
@@ -70,6 +69,11 @@
   public abstract FlowControlSettings flowControlSettings();
 
   /**
+   * A set of partitions. If empty, continuously poll the set of partitions using an admin client.
+   */
+  public abstract Set<Partition> partitions();
+
+  /**
    * The minimum wall time to pass before allowing bundle closure.
    *
    * <p>Setting this to too small of a value will result in increased compute costs and lower
@@ -104,6 +108,7 @@
   public static Builder newBuilder() {
     Builder builder = new AutoValue_SubscriberOptions.Builder();
     return builder
+        .setPartitions(ImmutableSet.of())
         .setFlowControlSettings(DEFAULT_FLOW_CONTROL)
         .setMinBundleTimeout(MIN_BUNDLE_TIMEOUT);
   }
@@ -114,19 +119,20 @@
       throws ApiException {
     try {
       SubscriberServiceSettings.Builder settingsBuilder = SubscriberServiceSettings.newBuilder();
+
       settingsBuilder =
           addDefaultMetadata(
               PubsubContext.of(FRAMEWORK),
               RoutingMetadata.of(subscriptionPath(), partition),
               settingsBuilder);
       return SubscriberServiceClient.create(
-          addDefaultSettings(subscriptionPath().location().extractRegion(), settingsBuilder));
+          addDefaultSettings(subscriptionPath().location().region(), settingsBuilder));
     } catch (Throwable t) {
       throw toCanonical(t).underlying;
     }
   }
 
-  SubscriberFactory getSubscriberFactory(Partition partition, Offset initialOffset) {
+  SubscriberFactory getSubscriberFactory(Partition partition) {
     SubscriberFactory factory = subscriberFactory();
     if (factory != null) {
       return factory;
@@ -137,10 +143,6 @@
             .setSubscriptionPath(subscriptionPath())
             .setPartition(partition)
             .setServiceClient(newSubscriberServiceClient(partition))
-            .setInitialLocation(
-                SeekRequest.newBuilder()
-                    .setCursor(Cursor.newBuilder().setOffset(initialOffset.value()))
-                    .build())
             .build();
   }
 
@@ -148,7 +150,7 @@
     try {
       return CursorServiceClient.create(
           addDefaultSettings(
-              subscriptionPath().location().extractRegion(), CursorServiceSettings.newBuilder()));
+              subscriptionPath().location().region(), CursorServiceSettings.newBuilder()));
     } catch (Throwable t) {
       throw toCanonical(t).underlying;
     }
@@ -187,7 +189,7 @@
     return new InitialOffsetReaderImpl(
         CursorClient.create(
             CursorClientSettings.newBuilder()
-                .setRegion(subscriptionPath().location().extractRegion())
+                .setRegion(subscriptionPath().location().region())
                 .build()),
         subscriptionPath(),
         partition);
@@ -199,6 +201,8 @@
     public abstract Builder setSubscriptionPath(SubscriptionPath path);
 
     // Optional parameters
+    public abstract Builder setPartitions(Set<Partition> partitions);
+
     public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);
 
     public abstract Builder setMinBundleTimeout(Duration minBundleTimeout);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java
index 530c180..6bf3623 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java
@@ -19,6 +19,7 @@
 
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import java.io.Serializable;
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 
@@ -27,6 +28,6 @@
 
   SubscriptionPartitionProcessor newProcessor(
       SubscriptionPartition subscriptionPartition,
-      RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
+      RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver);
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java
index a086d18..8d2a137 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite;
 
-import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
-
 import com.google.api.core.ApiService.Listener;
 import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
@@ -26,8 +24,9 @@
 import com.google.cloud.pubsublite.internal.CheckedApiException;
 import com.google.cloud.pubsublite.internal.ExtractStatus;
 import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
+import com.google.cloud.pubsublite.proto.Cursor;
 import com.google.cloud.pubsublite.proto.FlowControlRequest;
+import com.google.cloud.pubsublite.proto.SeekRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
 import java.util.List;
@@ -37,17 +36,19 @@
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 class SubscriptionPartitionProcessorImpl extends Listener
     implements SubscriptionPartitionProcessor {
-  private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
+  private final RestrictionTracker<OffsetRange, OffsetByteProgress> tracker;
   private final OutputReceiver<SequencedMessage> receiver;
   private final Subscriber subscriber;
   private final SettableFuture<Void> completionFuture = SettableFuture.create();
@@ -56,7 +57,7 @@
 
   @SuppressWarnings("methodref.receiver.bound.invalid")
   SubscriptionPartitionProcessorImpl(
-      RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
+      RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver,
       Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
       FlowControlSettings flowControlSettings) {
@@ -69,15 +70,23 @@
   @Override
   @SuppressWarnings("argument.type.incompatible")
   public void start() throws CheckedApiException {
-    this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor());
+    this.subscriber.addListener(this, MoreExecutors.directExecutor());
     this.subscriber.startAsync();
     this.subscriber.awaitRunning();
     try {
+      this.subscriber
+          .seek(
+              SeekRequest.newBuilder()
+                  .setCursor(Cursor.newBuilder().setOffset(tracker.currentRestriction().getFrom()))
+                  .build())
+          .get();
       this.subscriber.allowFlow(
           FlowControlRequest.newBuilder()
               .setAllowedBytes(flowControlSettings.bytesOutstanding())
               .setAllowedMessages(flowControlSettings.messagesOutstanding())
               .build());
+    } catch (ExecutionException e) {
+      throw ExtractStatus.toCanonical(e.getCause());
     } catch (Throwable t) {
       throw ExtractStatus.toCanonical(t);
     }
@@ -116,7 +125,7 @@
 
   @Override
   public void close() {
-    blockingShutdown(subscriber);
+    subscriber.stopAsync().awaitTerminated();
   }
 
   @Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java
index 79db0f1..8c1dd94 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java
@@ -62,7 +62,7 @@
       try (AdminClient adminClient =
           AdminClient.create(
               AdminClientSettings.newBuilder()
-                  .setRegion(subscriptionPath.location().extractRegion())
+                  .setRegion(subscriptionPath.location().region())
                   .build())) {
         return setTopicPath(
             TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic()));
@@ -81,9 +81,7 @@
 
   TopicBacklogReader instantiate() throws ApiException {
     TopicStatsClientSettings settings =
-        TopicStatsClientSettings.newBuilder()
-            .setRegion(topicPath().location().extractRegion())
-            .build();
+        TopicStatsClientSettings.newBuilder().setRegion(topicPath().location().region()).build();
     TopicBacklogReader impl =
         new TopicBacklogReaderImpl(TopicStatsClient.create(settings), topicPath(), partition());
     return new LimitingTopicBacklogReader(impl, Ticker.systemTicker());
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java
deleted file mode 100644
index 7f0d030..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
-
-public abstract class TrackerWithProgress
-    extends RestrictionTracker<OffsetByteRange, OffsetByteProgress> implements HasProgress {}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java
index 5a31f4f..f34ebb6 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java
@@ -49,7 +49,7 @@
   private static final double IGNORED_FRACTION = -10000000.0;
   private static final long MIN_BYTES = 1000;
   private static final OffsetRange RANGE = new OffsetRange(123L, Long.MAX_VALUE);
-  private final TopicBacklogReader unownedBacklogReader = mock(TopicBacklogReader.class);
+  private final TopicBacklogReader reader = mock(TopicBacklogReader.class);
 
   @Spy Ticker ticker;
   private OffsetByteRangeTracker tracker;
@@ -60,18 +60,14 @@
     when(ticker.read()).thenReturn(0L);
     tracker =
         new OffsetByteRangeTracker(
-            OffsetByteRange.of(RANGE, 0),
-            unownedBacklogReader,
-            Stopwatch.createUnstarted(ticker),
-            Duration.millis(500),
-            MIN_BYTES);
+            RANGE, reader, Stopwatch.createUnstarted(ticker), Duration.millis(500), MIN_BYTES);
   }
 
   @Test
   public void progressTracked() {
     assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(123), 10)));
     assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(124), 11)));
-    when(unownedBacklogReader.computeMessageStats(Offset.of(125)))
+    when(reader.computeMessageStats(Offset.of(125)))
         .thenReturn(ComputeMessageStatsResponse.newBuilder().setMessageBytes(1000).build());
     Progress progress = tracker.getProgress();
     assertEquals(21, progress.getWorkCompleted(), .0001);
@@ -80,7 +76,7 @@
 
   @Test
   public void getProgressStatsFailure() {
-    when(unownedBacklogReader.computeMessageStats(Offset.of(123)))
+    when(reader.computeMessageStats(Offset.of(123)))
         .thenThrow(new CheckedApiException(Code.INTERNAL).underlying);
     assertThrows(ApiException.class, tracker::getProgress);
   }
@@ -90,15 +86,11 @@
   public void claimSplitSuccess() {
     assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES)));
     assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(10_000), MIN_BYTES)));
-    SplitResult<OffsetByteRange> splits = tracker.trySplit(IGNORED_FRACTION);
-    OffsetByteRange primary = splits.getPrimary();
-    assertEquals(RANGE.getFrom(), primary.getRange().getFrom());
-    assertEquals(10_001, primary.getRange().getTo());
-    assertEquals(MIN_BYTES * 2, primary.getByteCount());
-    OffsetByteRange residual = splits.getResidual();
-    assertEquals(10_001, residual.getRange().getFrom());
-    assertEquals(Long.MAX_VALUE, residual.getRange().getTo());
-    assertEquals(0, residual.getByteCount());
+    SplitResult<OffsetRange> splits = tracker.trySplit(IGNORED_FRACTION);
+    assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom());
+    assertEquals(10_001, splits.getPrimary().getTo());
+    assertEquals(10_001, splits.getResidual().getFrom());
+    assertEquals(Long.MAX_VALUE, splits.getResidual().getTo());
     assertEquals(splits.getPrimary(), tracker.currentRestriction());
     tracker.checkDone();
     assertNull(tracker.trySplit(IGNORED_FRACTION));
@@ -108,10 +100,10 @@
   @SuppressWarnings({"dereference.of.nullable", "argument.type.incompatible"})
   public void splitWithoutClaimEmpty() {
     when(ticker.read()).thenReturn(100000000000000L);
-    SplitResult<OffsetByteRange> splits = tracker.trySplit(IGNORED_FRACTION);
-    assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getFrom());
-    assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getTo());
-    assertEquals(RANGE, splits.getResidual().getRange());
+    SplitResult<OffsetRange> splits = tracker.trySplit(IGNORED_FRACTION);
+    assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom());
+    assertEquals(RANGE.getFrom(), splits.getPrimary().getTo());
+    assertEquals(RANGE, splits.getResidual());
     assertEquals(splits.getPrimary(), tracker.currentRestriction());
     tracker.checkDone();
     assertNull(tracker.trySplit(IGNORED_FRACTION));
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java
index 0a4e3e7..598037e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java
@@ -28,7 +28,6 @@
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
 import static org.mockito.MockitoAnnotations.initMocks;
 
@@ -52,8 +51,6 @@
 import org.apache.beam.sdk.transforms.SerializableBiFunction;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
 import org.joda.time.Duration;
 import org.junit.Before;
 import org.junit.Test;
@@ -68,24 +65,22 @@
 public class PerSubscriptionPartitionSdfTest {
   private static final Duration MAX_SLEEP_TIME =
       Duration.standardMinutes(10).plus(Duration.millis(10));
-  private static final OffsetByteRange RESTRICTION =
-      OffsetByteRange.of(new OffsetRange(1, Long.MAX_VALUE), 0);
+  private static final OffsetRange RESTRICTION = new OffsetRange(1, Long.MAX_VALUE);
   private static final SubscriptionPartition PARTITION =
       SubscriptionPartition.of(example(SubscriptionPath.class), example(Partition.class));
 
   @Mock SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory;
 
-  @Mock ManagedBacklogReaderFactory backlogReaderFactory;
-  @Mock TopicBacklogReader backlogReader;
-
   @Mock
-  SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress> trackerFactory;
+  SerializableBiFunction<
+          SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>>
+      trackerFactory;
 
   @Mock SubscriptionPartitionProcessorFactory processorFactory;
   @Mock SerializableFunction<SubscriptionPartition, Committer> committerFactory;
 
   @Mock InitialOffsetReader initialOffsetReader;
-  @Spy TrackerWithProgress tracker;
+  @Spy RestrictionTracker<OffsetRange, OffsetByteProgress> tracker;
   @Mock OutputReceiver<SequencedMessage> output;
   @Mock SubscriptionPartitionProcessor processor;
 
@@ -103,11 +98,9 @@
     when(trackerFactory.apply(any(), any())).thenReturn(tracker);
     when(committerFactory.apply(any())).thenReturn(committer);
     when(tracker.currentRestriction()).thenReturn(RESTRICTION);
-    when(backlogReaderFactory.newReader(any())).thenReturn(backlogReader);
     sdf =
         new PerSubscriptionPartitionSdf(
             MAX_SLEEP_TIME,
-            backlogReaderFactory,
             offsetReaderFactory,
             trackerFactory,
             processorFactory,
@@ -117,10 +110,9 @@
   @Test
   public void getInitialRestrictionReadSuccess() {
     when(initialOffsetReader.read()).thenReturn(example(Offset.class));
-    OffsetByteRange range = sdf.getInitialRestriction(PARTITION);
-    assertEquals(example(Offset.class).value(), range.getRange().getFrom());
-    assertEquals(Long.MAX_VALUE, range.getRange().getTo());
-    assertEquals(0, range.getByteCount());
+    OffsetRange range = sdf.getInitialRestriction(PARTITION);
+    assertEquals(example(Offset.class).value(), range.getFrom());
+    assertEquals(Long.MAX_VALUE, range.getTo());
     verify(offsetReaderFactory).apply(PARTITION);
   }
 
@@ -133,13 +125,7 @@
   @Test
   public void newTrackerCallsFactory() {
     assertSame(tracker, sdf.newTracker(PARTITION, RESTRICTION));
-    verify(trackerFactory).apply(backlogReader, RESTRICTION);
-  }
-
-  @Test
-  public void tearDownClosesBacklogReaderFactory() {
-    sdf.teardown();
-    verify(backlogReaderFactory).close();
+    verify(trackerFactory).apply(PARTITION, RESTRICTION);
   }
 
   @Test
@@ -173,48 +159,12 @@
     order2.verify(committer).awaitTerminated();
   }
 
-  private static final class NoopManagedBacklogReaderFactory
-      implements ManagedBacklogReaderFactory {
-    @Override
-    public TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) {
-      return null;
-    }
-
-    @Override
-    public void close() {}
-  }
-
   @Test
   @SuppressWarnings("return.type.incompatible")
   public void dofnIsSerializable() throws Exception {
     ObjectOutputStream output = new ObjectOutputStream(new ByteArrayOutputStream());
     output.writeObject(
         new PerSubscriptionPartitionSdf(
-            MAX_SLEEP_TIME,
-            new NoopManagedBacklogReaderFactory(),
-            x -> null,
-            (x, y) -> null,
-            (x, y, z) -> null,
-            (x) -> null));
-  }
-
-  @Test
-  public void getProgressUnboundedRangeDelegates() {
-    Progress progress = Progress.from(0, 0.2);
-    when(tracker.getProgress()).thenReturn(progress);
-    assertTrue(
-        DoubleMath.fuzzyEquals(
-            progress.getWorkRemaining(), sdf.getSize(PARTITION, RESTRICTION), .0001));
-    verify(tracker).getProgress();
-  }
-
-  @Test
-  public void getProgressBoundedReturnsBytes() {
-    assertTrue(
-        DoubleMath.fuzzyEquals(
-            123.0,
-            sdf.getSize(PARTITION, OffsetByteRange.of(new OffsetRange(87, 8000), 123)),
-            .0001));
-    verifyNoInteractions(tracker);
+            MAX_SLEEP_TIME, x -> null, (x, y) -> null, (x, y, z) -> null, (x) -> null));
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
deleted file mode 100644
index e242942..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-import static org.junit.Assert.fail;
-
-import com.google.cloud.pubsublite.AdminClient;
-import com.google.cloud.pubsublite.AdminClientSettings;
-import com.google.cloud.pubsublite.BacklogLocation;
-import com.google.cloud.pubsublite.CloudZone;
-import com.google.cloud.pubsublite.Message;
-import com.google.cloud.pubsublite.ProjectId;
-import com.google.cloud.pubsublite.SubscriptionName;
-import com.google.cloud.pubsublite.SubscriptionPath;
-import com.google.cloud.pubsublite.TopicName;
-import com.google.cloud.pubsublite.TopicPath;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.cloud.pubsublite.proto.SequencedMessage;
-import com.google.cloud.pubsublite.proto.Subscription;
-import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement;
-import com.google.cloud.pubsublite.proto.Topic;
-import com.google.cloud.pubsublite.proto.Topic.PartitionConfig.Capacity;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
-import com.google.protobuf.ByteString;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.FlatMapElements;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.joda.time.Duration;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(JUnit4.class)
-public class ReadWriteIT {
-  private static final Logger LOG = LoggerFactory.getLogger(ReadWriteIT.class);
-  private static final CloudZone ZONE = CloudZone.parse("us-central1-b");
-  private static final int MESSAGE_COUNT = 90;
-
-  @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
-  private static ProjectId getProject(PipelineOptions options) {
-    return ProjectId.of(checkArgumentNotNull(options.as(GcpOptions.class).getProject()));
-  }
-
-  private static String randomName() {
-    return "beam_it_resource_" + ThreadLocalRandom.current().nextLong();
-  }
-
-  private static AdminClient newAdminClient() {
-    return AdminClient.create(AdminClientSettings.newBuilder().setRegion(ZONE.region()).build());
-  }
-
-  private final Deque<Runnable> cleanupActions = new ArrayDeque<>();
-
-  private TopicPath createTopic(ProjectId id) throws Exception {
-    TopicPath toReturn =
-        TopicPath.newBuilder()
-            .setProject(id)
-            .setLocation(ZONE)
-            .setName(TopicName.of(randomName()))
-            .build();
-    Topic.Builder topic = Topic.newBuilder().setName(toReturn.toString());
-    topic
-        .getPartitionConfigBuilder()
-        .setCount(2)
-        .setCapacity(Capacity.newBuilder().setPublishMibPerSec(4).setSubscribeMibPerSec(4));
-    topic.getRetentionConfigBuilder().setPerPartitionBytes(30 * (1L << 30));
-    cleanupActions.addLast(
-        () -> {
-          try (AdminClient client = newAdminClient()) {
-            client.deleteTopic(toReturn).get();
-          } catch (Throwable t) {
-            LOG.error("Failed to clean up topic.", t);
-          }
-        });
-    try (AdminClient client = newAdminClient()) {
-      client.createTopic(topic.build()).get();
-    }
-    return toReturn;
-  }
-
-  private SubscriptionPath createSubscription(TopicPath topic) throws Exception {
-    SubscriptionPath toReturn =
-        SubscriptionPath.newBuilder()
-            .setProject(topic.project())
-            .setLocation(ZONE)
-            .setName(SubscriptionName.of(randomName()))
-            .build();
-    Subscription.Builder subscription = Subscription.newBuilder().setName(toReturn.toString());
-    subscription
-        .getDeliveryConfigBuilder()
-        .setDeliveryRequirement(DeliveryRequirement.DELIVER_IMMEDIATELY);
-    subscription.setTopic(topic.toString());
-    cleanupActions.addLast(
-        () -> {
-          try (AdminClient client = newAdminClient()) {
-            client.deleteSubscription(toReturn).get();
-          } catch (Throwable t) {
-            LOG.error("Failed to clean up subscription.", t);
-          }
-        });
-    try (AdminClient client = newAdminClient()) {
-      client.createSubscription(subscription.build(), BacklogLocation.BEGINNING).get();
-    }
-    return toReturn;
-  }
-
-  @After
-  public void tearDown() {
-    while (!cleanupActions.isEmpty()) {
-      cleanupActions.removeLast().run();
-    }
-  }
-
-  // Workaround for BEAM-12867
-  // TODO(BEAM-12867): Remove this.
-  private static class CustomCreate extends PTransform<PCollection<Void>, PCollection<Integer>> {
-    @Override
-    public PCollection<Integer> expand(PCollection<Void> input) {
-      return input.apply(
-          "createIndexes",
-          FlatMapElements.via(
-              new SimpleFunction<Void, Iterable<Integer>>() {
-                @Override
-                public Iterable<Integer> apply(Void input) {
-                  return IntStream.range(0, MESSAGE_COUNT).boxed().collect(Collectors.toList());
-                }
-              }));
-    }
-  }
-
-  public static void writeMessages(TopicPath topicPath, Pipeline pipeline) {
-    PCollection<Void> trigger = pipeline.apply(Create.of((Void) null));
-    PCollection<Integer> indexes = trigger.apply("createIndexes", new CustomCreate());
-    PCollection<PubSubMessage> messages =
-        indexes.apply(
-            "createMessages",
-            MapElements.via(
-                new SimpleFunction<Integer, PubSubMessage>(
-                    index ->
-                        Message.builder()
-                            .setData(ByteString.copyFromUtf8(index.toString()))
-                            .build()
-                            .toProto()) {}));
-    // Add UUIDs to messages for later deduplication.
-    messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
-    messages.apply(
-        "writeMessages",
-        PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
-  }
-
-  public static PCollection<SequencedMessage> readMessages(
-      SubscriptionPath subscriptionPath, Pipeline pipeline) {
-    PCollection<SequencedMessage> messages =
-        pipeline.apply(
-            "readMessages",
-            PubsubLiteIO.read(
-                SubscriberOptions.newBuilder()
-                    .setSubscriptionPath(subscriptionPath)
-                    // setMinBundleTimeout INTENDED FOR TESTING ONLY
-                    // This sacrifices efficiency to make tests run faster. Do not use this in a
-                    // real pipeline!
-                    .setMinBundleTimeout(Duration.standardSeconds(5))
-                    .build()));
-    // Deduplicate messages based on the uuids added in PubsubLiteIO.addUuids() when writing.
-    return messages.apply(
-        "dedupeMessages", PubsubLiteIO.deduplicate(UuidDeduplicationOptions.newBuilder().build()));
-  }
-
-  // This static out of band communication is needed to retain serializability.
-  @GuardedBy("ReadWriteIT.class")
-  private static final List<SequencedMessage> received = new ArrayList<>();
-
-  private static synchronized void addMessageReceived(SequencedMessage message) {
-    received.add(message);
-  }
-
-  private static synchronized List<SequencedMessage> getTestQuickstartReceived() {
-    return ImmutableList.copyOf(received);
-  }
-
-  private static PTransform<PCollection<? extends SequencedMessage>, PCollection<Void>>
-      collectTestQuickstart() {
-    return MapElements.via(
-        new SimpleFunction<SequencedMessage, Void>() {
-          @Override
-          public Void apply(SequencedMessage input) {
-            addMessageReceived(input);
-            return null;
-          }
-        });
-  }
-
-  @Test
-  public void testReadWrite() throws Exception {
-    pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
-    pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
-
-    TopicPath topic = createTopic(getProject(pipeline.getOptions()));
-    SubscriptionPath subscription = createSubscription(topic);
-
-    // Publish some messages
-    writeMessages(topic, pipeline);
-
-    // Read some messages. They should be deduplicated by the time we see them, so there should be
-    // exactly numMessages, one for every index in [0,MESSAGE_COUNT).
-    PCollection<SequencedMessage> messages = readMessages(subscription, pipeline);
-    messages.apply("messageReceiver", collectTestQuickstart());
-    pipeline.run();
-    LOG.info("Running!");
-    for (int round = 0; round < 120; ++round) {
-      Thread.sleep(1000);
-      Map<Integer, Integer> receivedCounts = new HashMap<>();
-      for (SequencedMessage message : getTestQuickstartReceived()) {
-        int id = Integer.parseInt(message.getMessage().getData().toStringUtf8());
-        receivedCounts.put(id, receivedCounts.getOrDefault(id, 0) + 1);
-      }
-      LOG.info("Performing comparison round {}.\n", round);
-      boolean done = true;
-      List<Integer> missing = new ArrayList<>();
-      for (int id = 0; id < MESSAGE_COUNT; id++) {
-        int idCount = receivedCounts.getOrDefault(id, 0);
-        if (idCount == 0) {
-          missing.add(id);
-          done = false;
-        }
-        if (idCount > 1) {
-          fail(String.format("Failed to deduplicate message with id %s.", id));
-        }
-      }
-      LOG.info("Still messing messages: {}.\n", missing);
-      if (done) {
-        return;
-      }
-    }
-    fail(
-        String.format(
-            "Failed to receive all messages after 2 minutes. Received %s messages.",
-            getTestQuickstartReceived().size()));
-  }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java
index 3d743758..dbf3b93 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java
@@ -31,6 +31,7 @@
 import static org.mockito.Mockito.when;
 import static org.mockito.MockitoAnnotations.initMocks;
 
+import com.google.api.core.ApiFutures;
 import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.StatusCode.Code;
 import com.google.cloud.pubsublite.Offset;
@@ -39,6 +40,7 @@
 import com.google.cloud.pubsublite.internal.wire.Subscriber;
 import com.google.cloud.pubsublite.proto.Cursor;
 import com.google.cloud.pubsublite.proto.FlowControlRequest;
+import com.google.cloud.pubsublite.proto.SeekRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
 import java.util.List;
@@ -62,7 +64,7 @@
 @RunWith(JUnit4.class)
 @SuppressWarnings("initialization.fields.uninitialized")
 public class SubscriptionPartitionProcessorImplTest {
-  @Spy RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
+  @Spy RestrictionTracker<OffsetRange, OffsetByteProgress> tracker;
   @Mock OutputReceiver<SequencedMessage> receiver;
   @Mock Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory;
 
@@ -81,10 +83,6 @@
         .build();
   }
 
-  private OffsetByteRange initialRange() {
-    return OffsetByteRange.of(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
-  }
-
   @Before
   public void setUp() {
     initMocks(this);
@@ -102,11 +100,18 @@
 
   @Test
   public void lifecycle() throws Exception {
-    when(tracker.currentRestriction()).thenReturn(initialRange());
+    when(tracker.currentRestriction())
+        .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+    when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
     processor.start();
     verify(subscriber).startAsync();
     verify(subscriber).awaitRunning();
     verify(subscriber)
+        .seek(
+            SeekRequest.newBuilder()
+                .setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value()))
+                .build());
+    verify(subscriber)
         .allowFlow(
             FlowControlRequest.newBuilder()
                 .setAllowedBytes(DEFAULT_FLOW_CONTROL.bytesOutstanding())
@@ -118,15 +123,29 @@
   }
 
   @Test
-  public void lifecycleFlowControlThrows() throws Exception {
-    when(tracker.currentRestriction()).thenReturn(initialRange());
+  public void lifecycleSeekThrows() throws Exception {
+    when(tracker.currentRestriction())
+        .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+    when(subscriber.seek(any()))
+        .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE)));
     doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any());
     assertThrows(CheckedApiException.class, () -> processor.start());
   }
 
   @Test
+  public void lifecycleFlowControlThrows() {
+    when(tracker.currentRestriction())
+        .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+    when(subscriber.seek(any()))
+        .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE)));
+    assertThrows(CheckedApiException.class, () -> processor.start());
+  }
+
+  @Test
   public void lifecycleSubscriberAwaitThrows() throws Exception {
-    when(tracker.currentRestriction()).thenReturn(initialRange());
+    when(tracker.currentRestriction())
+        .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+    when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
     processor.start();
     doThrow(new CheckedApiException(Code.INTERNAL).underlying).when(subscriber).awaitTerminated();
     assertThrows(ApiException.class, () -> processor.close());
@@ -136,19 +155,21 @@
 
   @Test
   public void subscriberFailureFails() throws Exception {
-    when(tracker.currentRestriction()).thenReturn(initialRange());
+    when(tracker.currentRestriction())
+        .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+    when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
     processor.start();
     subscriber.fail(new CheckedApiException(Code.OUT_OF_RANGE));
     ApiException e =
-        assertThrows(
-            // Longer wait is needed due to listener asynchrony.
-            ApiException.class, () -> processor.waitForCompletion(Duration.standardSeconds(1)));
+        assertThrows(ApiException.class, () -> processor.waitForCompletion(Duration.ZERO));
     assertEquals(Code.OUT_OF_RANGE, e.getStatusCode().getCode());
   }
 
   @Test
   public void allowFlowFailureFails() throws Exception {
-    when(tracker.currentRestriction()).thenReturn(initialRange());
+    when(tracker.currentRestriction())
+        .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+    when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
     processor.start();
     when(tracker.tryClaim(any())).thenReturn(true);
     doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any());