Merge pull request #15629 from apache/revert-15608-dpcollins-cherry-pick
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 480a2d2..59ca672 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -446,7 +446,7 @@
def errorprone_version = "2.3.4"
def google_clients_version = "1.31.0"
def google_cloud_bigdataoss_version = "2.2.2"
- def google_cloud_pubsublite_version = "1.0.4"
+ def google_cloud_pubsublite_version = "0.13.2"
def google_code_gson_version = "2.8.6"
def google_oauth_clients_version = "1.31.0"
// Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java
index bf6a288..b0cc681 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java
@@ -24,36 +24,25 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
/** Common util functions for converting between PubsubMessage proto and {@link PubsubMessage}. */
-public final class PubsubMessages {
- private PubsubMessages() {}
-
- public static com.google.pubsub.v1.PubsubMessage toProto(PubsubMessage input) {
- Map<String, String> attributes = input.getAttributeMap();
- com.google.pubsub.v1.PubsubMessage.Builder message =
- com.google.pubsub.v1.PubsubMessage.newBuilder()
- .setData(ByteString.copyFrom(input.getPayload()));
- // TODO(BEAM-8085) this should not be null
- if (attributes != null) {
- message.putAllAttributes(attributes);
- }
- String messageId = input.getMessageId();
- if (messageId != null) {
- message.setMessageId(messageId);
- }
- return message.build();
- }
-
- public static PubsubMessage fromProto(com.google.pubsub.v1.PubsubMessage input) {
- return new PubsubMessage(
- input.getData().toByteArray(), input.getAttributesMap(), input.getMessageId());
- }
-
+public class PubsubMessages {
// Convert the PubsubMessage to a PubsubMessage proto, then return its serialized representation.
public static class ParsePayloadAsPubsubMessageProto
implements SerializableFunction<PubsubMessage, byte[]> {
@Override
public byte[] apply(PubsubMessage input) {
- return toProto(input).toByteArray();
+ Map<String, String> attributes = input.getAttributeMap();
+ com.google.pubsub.v1.PubsubMessage.Builder message =
+ com.google.pubsub.v1.PubsubMessage.newBuilder()
+ .setData(ByteString.copyFrom(input.getPayload()));
+ // TODO(BEAM-8085) this should not be null
+ if (attributes != null) {
+ message.putAllAttributes(attributes);
+ }
+ String messageId = input.getMessageId();
+ if (messageId != null) {
+ message.setMessageId(messageId);
+ }
+ return message.build().toByteArray();
}
}
@@ -65,7 +54,8 @@
try {
com.google.pubsub.v1.PubsubMessage message =
com.google.pubsub.v1.PubsubMessage.parseFrom(input);
- return fromProto(message);
+ return new PubsubMessage(
+ message.getData().toByteArray(), message.getAttributesMap(), message.getMessageId());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Could not decode Pubsub message", e);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java
new file mode 100644
index 0000000..6dc1516
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
+
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.proto.PubSubMessage;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A class providing a conversion validity check between Cloud Pub/Sub and Pub/Sub Lite message
+ * types.
+ */
+public final class CloudPubsubChecks {
+ private CloudPubsubChecks() {}
+
+ /**
+ * Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the
+ * standard transformation methods in the client library.
+ *
+ * <p>Will fail the pipeline if a message has multiple attributes per key.
+ */
+ public static PTransform<PCollection<? extends PubSubMessage>, PCollection<PubSubMessage>>
+ ensureUsableAsCloudPubsub() {
+ return MapElements.into(TypeDescriptor.of(PubSubMessage.class))
+ .via(
+ message -> {
+ Object unused = toCpsPublishTransformer().transform(Message.fromProto(message));
+ return message;
+ });
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
deleted file mode 100644
index 1140c11..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer;
-import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
-import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer;
-
-import com.google.cloud.pubsublite.Message;
-import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.cloud.pubsublite.proto.SequencedMessage;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/** A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message types. */
-public final class CloudPubsubTransforms {
- private CloudPubsubTransforms() {}
- /**
- * Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the
- * standard transformation methods in the client library.
- *
- * <p>Will fail the pipeline if a message has multiple attributes per key.
- */
- public static PTransform<PCollection<PubSubMessage>, PCollection<PubSubMessage>>
- ensureUsableAsCloudPubsub() {
- return new PTransform<PCollection<PubSubMessage>, PCollection<PubSubMessage>>() {
- @Override
- public PCollection<PubSubMessage> expand(PCollection<PubSubMessage> input) {
- return input.apply(
- MapElements.into(TypeDescriptor.of(PubSubMessage.class))
- .via(
- message -> {
- Object unused =
- toCpsPublishTransformer().transform(Message.fromProto(message));
- return message;
- }));
- }
- };
- }
-
- /**
- * Transform messages read from Pub/Sub Lite to their equivalent Cloud Pub/Sub Message that would
- * have been read from PubsubIO.
- *
- * <p>Will fail the pipeline if a message has multiple attributes per map key.
- */
- public static PTransform<PCollection<SequencedMessage>, PCollection<PubsubMessage>>
- toCloudPubsubMessages() {
- return new PTransform<PCollection<SequencedMessage>, PCollection<PubsubMessage>>() {
- @Override
- public PCollection<PubsubMessage> expand(PCollection<SequencedMessage> input) {
- return input.apply(
- MapElements.into(TypeDescriptor.of(PubsubMessage.class))
- .via(
- message ->
- PubsubMessages.fromProto(
- toCpsSubscribeTransformer()
- .transform(
- com.google.cloud.pubsublite.SequencedMessage.fromProto(
- message)))));
- }
- };
- }
-
- /**
- * Transform messages publishable using PubsubIO to their equivalent Pub/Sub Lite publishable
- * message.
- */
- public static PTransform<PCollection<PubsubMessage>, PCollection<PubSubMessage>>
- fromCloudPubsubMessages() {
- return new PTransform<PCollection<PubsubMessage>, PCollection<PubSubMessage>>() {
- @Override
- public PCollection<PubSubMessage> expand(PCollection<PubsubMessage> input) {
- return input.apply(
- MapElements.into(TypeDescriptor.of(PubSubMessage.class))
- .via(
- message ->
- fromCpsPublishTransformer(KeyExtractor.DEFAULT)
- .transform(PubsubMessages.toProto(message))
- .toProto()));
- }
- };
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java
deleted file mode 100644
index de0cf43..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import java.io.Serializable;
-
-/**
- * A ManagedBacklogReaderFactory produces TopicBacklogReaders and tears down any produced readers
- * when it is itself closed.
- *
- * <p>close() should never be called on produced readers.
- */
-public interface ManagedBacklogReaderFactory extends AutoCloseable, Serializable {
- TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition);
-
- @Override
- void close();
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java
deleted file mode 100644
index 9a337bf..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import com.google.api.gax.rpc.ApiException;
-import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-
-public class ManagedBacklogReaderFactoryImpl implements ManagedBacklogReaderFactory {
- private final SerializableFunction<SubscriptionPartition, TopicBacklogReader> newReader;
-
- @GuardedBy("this")
- private final Map<SubscriptionPartition, TopicBacklogReader> readers = new HashMap<>();
-
- ManagedBacklogReaderFactoryImpl(
- SerializableFunction<SubscriptionPartition, TopicBacklogReader> newReader) {
- this.newReader = newReader;
- }
-
- private static final class NonCloseableTopicBacklogReader implements TopicBacklogReader {
- private final TopicBacklogReader underlying;
-
- NonCloseableTopicBacklogReader(TopicBacklogReader underlying) {
- this.underlying = underlying;
- }
-
- @Override
- public ComputeMessageStatsResponse computeMessageStats(Offset offset) throws ApiException {
- return underlying.computeMessageStats(offset);
- }
-
- @Override
- public void close() {
- throw new IllegalArgumentException(
- "Cannot call close() on a reader returned from ManagedBacklogReaderFactory.");
- }
- }
-
- @Override
- public synchronized TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) {
- return new NonCloseableTopicBacklogReader(
- readers.computeIfAbsent(subscriptionPartition, newReader::apply));
- }
-
- @Override
- public synchronized void close() {
- readers.values().forEach(TopicBacklogReader::close);
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java
deleted file mode 100644
index b39d87e..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.range.OffsetRange;
-
-@AutoValue
-@DefaultCoder(OffsetByteRangeCoder.class)
-abstract class OffsetByteRange {
- abstract OffsetRange getRange();
-
- abstract long getByteCount();
-
- static OffsetByteRange of(OffsetRange range, long byteCount) {
- return new AutoValue_OffsetByteRange(range, byteCount);
- }
-
- static OffsetByteRange of(OffsetRange range) {
- return of(range, 0);
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java
deleted file mode 100644
index 076cda1..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.coders.CoderProviders;
-import org.apache.beam.sdk.coders.DelegateCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.range.OffsetRange;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-public class OffsetByteRangeCoder extends AtomicCoder<OffsetByteRange> {
- private static final Coder<OffsetByteRange> CODER =
- DelegateCoder.of(
- KvCoder.of(OffsetRange.Coder.of(), VarLongCoder.of()),
- OffsetByteRangeCoder::toKv,
- OffsetByteRangeCoder::fromKv);
-
- private static KV<OffsetRange, Long> toKv(OffsetByteRange value) {
- return KV.of(value.getRange(), value.getByteCount());
- }
-
- private static OffsetByteRange fromKv(KV<OffsetRange, Long> kv) {
- return OffsetByteRange.of(kv.getKey(), kv.getValue());
- }
-
- @Override
- public void encode(OffsetByteRange value, OutputStream outStream) throws IOException {
- CODER.encode(value, outStream);
- }
-
- @Override
- public OffsetByteRange decode(InputStream inStream) throws IOException {
- return CODER.decode(inStream);
- }
-
- public static CoderProvider getCoderProvider() {
- return CoderProviders.forCoder(
- TypeDescriptor.of(OffsetByteRange.class), new OffsetByteRangeCoder());
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java
index da9aaaa..608af8f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java
@@ -26,6 +26,8 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
import org.joda.time.Duration;
@@ -42,27 +44,24 @@
* received. IMPORTANT: minTrackingTime must be strictly smaller than the SDF read timeout when it
* would return ProcessContinuation.resume().
*/
-class OffsetByteRangeTracker extends TrackerWithProgress {
- private final TopicBacklogReader unownedBacklogReader;
+class OffsetByteRangeTracker extends RestrictionTracker<OffsetRange, OffsetByteProgress>
+ implements HasProgress {
+ private final TopicBacklogReader backlogReader;
private final Duration minTrackingTime;
private final long minBytesReceived;
private final Stopwatch stopwatch;
- private OffsetByteRange range;
+ private OffsetRange range;
private @Nullable Long lastClaimed;
+ private long byteCount = 0;
public OffsetByteRangeTracker(
- OffsetByteRange range,
- TopicBacklogReader unownedBacklogReader,
+ OffsetRange range,
+ TopicBacklogReader backlogReader,
Stopwatch stopwatch,
Duration minTrackingTime,
long minBytesReceived) {
- checkArgument(
- range.getRange().getTo() == Long.MAX_VALUE,
- "May only construct OffsetByteRangeTracker with an unbounded range with no progress.");
- checkArgument(
- range.getByteCount() == 0L,
- "May only construct OffsetByteRangeTracker with an unbounded range with no progress.");
- this.unownedBacklogReader = unownedBacklogReader;
+ checkArgument(range.getTo() == Long.MAX_VALUE);
+ this.backlogReader = backlogReader;
this.minTrackingTime = minTrackingTime;
this.minBytesReceived = minBytesReceived;
this.stopwatch = stopwatch.reset().start();
@@ -70,6 +69,11 @@
}
@Override
+ public void finalize() {
+ this.backlogReader.close();
+ }
+
+ @Override
public IsBounded isBounded() {
return IsBounded.UNBOUNDED;
}
@@ -83,32 +87,32 @@
position.lastOffset().value(),
lastClaimed);
checkArgument(
- toClaim >= range.getRange().getFrom(),
+ toClaim >= range.getFrom(),
"Trying to claim offset %s before start of the range %s",
toClaim,
range);
// split() has already been called, truncating this range. No more offsets may be claimed.
- if (range.getRange().getTo() != Long.MAX_VALUE) {
- boolean isRangeEmpty = range.getRange().getTo() == range.getRange().getFrom();
- boolean isValidClosedRange = nextOffset() == range.getRange().getTo();
+ if (range.getTo() != Long.MAX_VALUE) {
+ boolean isRangeEmpty = range.getTo() == range.getFrom();
+ boolean isValidClosedRange = nextOffset() == range.getTo();
checkState(
isRangeEmpty || isValidClosedRange,
"Violated class precondition: offset range improperly split. Please report a beam bug.");
return false;
}
lastClaimed = toClaim;
- range = OffsetByteRange.of(range.getRange(), range.getByteCount() + position.batchBytes());
+ byteCount += position.batchBytes();
return true;
}
@Override
- public OffsetByteRange currentRestriction() {
+ public OffsetRange currentRestriction() {
return range;
}
private long nextOffset() {
checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE);
- return lastClaimed == null ? currentRestriction().getRange().getFrom() : lastClaimed + 1;
+ return lastClaimed == null ? currentRestriction().getFrom() : lastClaimed + 1;
}
/**
@@ -120,33 +124,29 @@
if (duration.isLongerThan(minTrackingTime)) {
return true;
}
- if (currentRestriction().getByteCount() >= minBytesReceived) {
+ if (byteCount >= minBytesReceived) {
return true;
}
return false;
}
@Override
- public @Nullable SplitResult<OffsetByteRange> trySplit(double fractionOfRemainder) {
+ public @Nullable SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
// Cannot split a bounded range. This should already be completely claimed.
- if (range.getRange().getTo() != Long.MAX_VALUE) {
+ if (range.getTo() != Long.MAX_VALUE) {
return null;
}
if (!receivedEnough()) {
return null;
}
- range =
- OffsetByteRange.of(
- new OffsetRange(currentRestriction().getRange().getFrom(), nextOffset()),
- range.getByteCount());
- return SplitResult.of(
- this.range, OffsetByteRange.of(new OffsetRange(nextOffset(), Long.MAX_VALUE), 0));
+ range = new OffsetRange(currentRestriction().getFrom(), nextOffset());
+ return SplitResult.of(this.range, new OffsetRange(nextOffset(), Long.MAX_VALUE));
}
@Override
@SuppressWarnings("unboxing.of.nullable")
public void checkDone() throws IllegalStateException {
- if (range.getRange().getFrom() == range.getRange().getTo()) {
+ if (range.getFrom() == range.getTo()) {
return;
}
checkState(
@@ -155,18 +155,18 @@
range);
long lastClaimedNotNull = checkNotNull(lastClaimed);
checkState(
- lastClaimedNotNull >= range.getRange().getTo() - 1,
+ lastClaimedNotNull >= range.getTo() - 1,
"Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",
lastClaimedNotNull,
range,
lastClaimedNotNull + 1,
- range.getRange().getTo());
+ range.getTo());
}
@Override
public Progress getProgress() {
ComputeMessageStatsResponse stats =
- this.unownedBacklogReader.computeMessageStats(Offset.of(nextOffset()));
- return Progress.from(range.getByteCount(), stats.getMessageBytes());
+ this.backlogReader.computeMessageStats(Offset.of(nextOffset()));
+ return Progress.from(byteCount, stats.getMessageBytes());
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java
index d7526d8..623e20c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java
@@ -27,8 +27,4 @@
private PerServerPublisherCache() {}
static final PublisherCache PUBLISHER_CACHE = new PublisherCache();
-
- static {
- Runtime.getRuntime().addShutdownHook(new Thread(PUBLISHER_CACHE::close));
- }
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
index fdf7920..a9f7a43 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java
@@ -17,12 +17,13 @@
*/
package org.apache.beam.sdk.io.gcp.pubsublite;
-import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.SequencedMessage;
+import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
@@ -34,35 +35,31 @@
class PerSubscriptionPartitionSdf extends DoFn<SubscriptionPartition, SequencedMessage> {
private final Duration maxSleepTime;
- private final ManagedBacklogReaderFactory backlogReaderFactory;
private final SubscriptionPartitionProcessorFactory processorFactory;
private final SerializableFunction<SubscriptionPartition, InitialOffsetReader>
offsetReaderFactory;
- private final SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress>
+ private final SerializableBiFunction<
+ SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>>
trackerFactory;
private final SerializableFunction<SubscriptionPartition, Committer> committerFactory;
PerSubscriptionPartitionSdf(
Duration maxSleepTime,
- ManagedBacklogReaderFactory backlogReaderFactory,
SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory,
- SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress>
+ SerializableBiFunction<
+ SubscriptionPartition,
+ OffsetRange,
+ RestrictionTracker<OffsetRange, OffsetByteProgress>>
trackerFactory,
SubscriptionPartitionProcessorFactory processorFactory,
SerializableFunction<SubscriptionPartition, Committer> committerFactory) {
this.maxSleepTime = maxSleepTime;
- this.backlogReaderFactory = backlogReaderFactory;
this.processorFactory = processorFactory;
this.offsetReaderFactory = offsetReaderFactory;
this.trackerFactory = trackerFactory;
this.committerFactory = committerFactory;
}
- @Teardown
- public void teardown() {
- backlogReaderFactory.close();
- }
-
@GetInitialWatermarkEstimatorState
public Instant getInitialWatermarkState() {
return Instant.EPOCH;
@@ -75,7 +72,7 @@
@ProcessElement
public ProcessContinuation processElement(
- RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
+ RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
@Element SubscriptionPartition subscriptionPartition,
OutputReceiver<SequencedMessage> receiver)
throws Exception {
@@ -86,44 +83,38 @@
processor
.lastClaimed()
.ifPresent(
- lastClaimedOffset -> {
+ lastClaimedOffset ->
+ /* TODO(boyuanzz): When default dataflow can use finalizers, undo this.
+ finalizer.afterBundleCommit(
+ Instant.ofEpochMilli(Long.MAX_VALUE),
+ () -> */ {
Committer committer = committerFactory.apply(subscriptionPartition);
committer.startAsync().awaitRunning();
// Commit the next-to-deliver offset.
try {
committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
+ } catch (ExecutionException e) {
+ throw toCanonical(checkArgumentNotNull(e.getCause())).underlying;
} catch (Exception e) {
- throw ExtractStatus.toCanonical(e).underlying;
+ throw toCanonical(e).underlying;
}
- blockingShutdown(committer);
+ committer.stopAsync().awaitTerminated();
});
return result;
}
}
@GetInitialRestriction
- public OffsetByteRange getInitialRestriction(
- @Element SubscriptionPartition subscriptionPartition) {
+ public OffsetRange getInitialRestriction(@Element SubscriptionPartition subscriptionPartition) {
try (InitialOffsetReader reader = offsetReaderFactory.apply(subscriptionPartition)) {
Offset offset = reader.read();
- return OffsetByteRange.of(
- new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */));
+ return new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */);
}
}
@NewTracker
- public TrackerWithProgress newTracker(
- @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetByteRange range) {
- return trackerFactory.apply(backlogReaderFactory.newReader(subscriptionPartition), range);
- }
-
- @GetSize
- public double getSize(
- @Element SubscriptionPartition subscriptionPartition,
- @Restriction OffsetByteRange restriction) {
- if (restriction.getRange().getTo() != Long.MAX_VALUE) {
- return restriction.getByteCount();
- }
- return newTracker(subscriptionPartition, restriction).getProgress().getWorkRemaining();
+ public RestrictionTracker<OffsetRange, OffsetByteProgress> newTracker(
+ @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetRange range) {
+ return trackerFactory.apply(subscriptionPartition, range);
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java
index 3dbdec6..f8dc24b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java
@@ -23,50 +23,52 @@
import com.google.api.core.ApiService.State;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.MessageMetadata;
+import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.Publisher;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
/** A map of working publishers by PublisherOptions. */
-class PublisherCache implements AutoCloseable {
- @GuardedBy("this")
+class PublisherCache {
+ private final CloseableMonitor monitor = new CloseableMonitor();
+
+ private final Executor listenerExecutor = Executors.newSingleThreadExecutor();
+
+ @GuardedBy("monitor.monitor")
private final HashMap<PublisherOptions, Publisher<MessageMetadata>> livePublishers =
new HashMap<>();
- private synchronized void evict(PublisherOptions options) {
- livePublishers.remove(options);
- }
-
- synchronized Publisher<MessageMetadata> get(PublisherOptions options) throws ApiException {
+ Publisher<MessageMetadata> get(PublisherOptions options) throws ApiException {
checkArgument(options.usesCache());
- Publisher<MessageMetadata> publisher = livePublishers.get(options);
- if (publisher != null) {
+ try (CloseableMonitor.Hold h = monitor.enter()) {
+ Publisher<MessageMetadata> publisher = livePublishers.get(options);
+ if (publisher != null) {
+ return publisher;
+ }
+ publisher = Publishers.newPublisher(options);
+ livePublishers.put(options, publisher);
+ publisher.addListener(
+ new Listener() {
+ @Override
+ public void failed(State s, Throwable t) {
+ try (CloseableMonitor.Hold h = monitor.enter()) {
+ livePublishers.remove(options);
+ }
+ }
+ },
+ listenerExecutor);
+ publisher.startAsync().awaitRunning();
return publisher;
}
- publisher = Publishers.newPublisher(options);
- livePublishers.put(options, publisher);
- publisher.addListener(
- new Listener() {
- @Override
- public void failed(State s, Throwable t) {
- evict(options);
- }
- },
- SystemExecutors.getFuturesExecutor());
- publisher.startAsync().awaitRunning();
- return publisher;
}
@VisibleForTesting
- synchronized void set(PublisherOptions options, Publisher<MessageMetadata> toCache) {
- livePublishers.put(options, toCache);
- }
-
- @Override
- public synchronized void close() {
- livePublishers.forEach(((options, publisher) -> publisher.stopAsync()));
- livePublishers.clear();
+ void set(PublisherOptions options, Publisher<MessageMetadata> toCache) {
+ try (CloseableMonitor.Hold h = monitor.enter()) {
+ livePublishers.put(options, toCache);
+ }
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java
index 67ea6cf..34012f7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java
@@ -17,27 +17,17 @@
*/
package org.apache.beam.sdk.io.gcp.pubsublite;
-import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument;
-import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
-import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.MessageMetadata;
-import com.google.cloud.pubsublite.Partition;
-import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
+import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
-import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
-import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
-import com.google.cloud.pubsublite.v1.AdminServiceClient;
-import com.google.cloud.pubsublite.v1.AdminServiceSettings;
-import com.google.cloud.pubsublite.v1.PublisherServiceClient;
-import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken;
class Publishers {
@@ -45,38 +35,6 @@
private Publishers() {}
- private static AdminClient newAdminClient(PublisherOptions options) throws ApiException {
- try {
- return AdminClient.create(
- AdminClientSettings.newBuilder()
- .setServiceClient(
- AdminServiceClient.create(
- addDefaultSettings(
- options.topicPath().location().extractRegion(),
- AdminServiceSettings.newBuilder())))
- .setRegion(options.topicPath().location().extractRegion())
- .build());
- } catch (Throwable t) {
- throw toCanonical(t).underlying;
- }
- }
-
- private static PublisherServiceClient newServiceClient(
- PublisherOptions options, Partition partition) {
- PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
- settingsBuilder =
- addDefaultMetadata(
- PubsubContext.of(FRAMEWORK),
- RoutingMetadata.of(options.topicPath(), partition),
- settingsBuilder);
- try {
- return PublisherServiceClient.create(
- addDefaultSettings(options.topicPath().location().extractRegion(), settingsBuilder));
- } catch (Throwable t) {
- throw toCanonical(t).underlying;
- }
- }
-
@SuppressWarnings("unchecked")
static Publisher<MessageMetadata> newPublisher(PublisherOptions options) throws ApiException {
SerializableSupplier<Object> supplier = options.publisherSupplier();
@@ -86,18 +44,20 @@
checkArgument(token.isSupertypeOf(supplied.getClass()));
return (Publisher<MessageMetadata>) supplied;
}
- return PartitionCountWatchingPublisherSettings.newBuilder()
- .setTopic(options.topicPath())
- .setPublisherFactory(
- partition ->
- SinglePartitionPublisherBuilder.newBuilder()
- .setTopic(options.topicPath())
- .setPartition(partition)
- .setServiceClient(newServiceClient(options, partition))
- .setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS)
- .build())
- .setAdminClient(newAdminClient(options))
- .build()
- .instantiate();
+
+ TopicPath topic = options.topicPath();
+ PartitionCountWatchingPublisherSettings.Builder publisherSettings =
+ PartitionCountWatchingPublisherSettings.newBuilder()
+ .setTopic(topic)
+ .setPublisherFactory(
+ partition ->
+ SinglePartitionPublisherBuilder.newBuilder()
+ .setTopic(topic)
+ .setPartition(partition)
+ .build())
+ .setAdminClient(
+ AdminClient.create(
+ AdminClientSettings.newBuilder().setRegion(topic.location().region()).build()));
+ return publisherSettings.build().instantiate();
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
index b93ac61..ca1f2be 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
@@ -107,7 +107,7 @@
* }</pre>
*/
public static PTransform<PCollection<PubSubMessage>, PDone> write(PublisherOptions options) {
- return new PTransform<PCollection<PubSubMessage>, PDone>() {
+ return new PTransform<PCollection<PubSubMessage>, PDone>("PubsubLiteIO") {
@Override
public PDone expand(PCollection<PubSubMessage> input) {
PubsubLiteSink sink = new PubsubLiteSink(options);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java
index d0e3afa..d3acdfa 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java
@@ -28,14 +28,16 @@
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOrError.Kind;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
/** A sink which publishes messages to Pub/Sub Lite. */
@SuppressWarnings({
@@ -54,6 +56,8 @@
@GuardedBy("this")
private transient Deque<CheckedApiException> errorsSinceLastFinish;
+ private static final Executor executor = Executors.newCachedThreadPool();
+
PubsubLiteSink(PublisherOptions options) {
this.options = options;
}
@@ -85,7 +89,7 @@
onFailure.accept(t);
}
},
- SystemExecutors.getFuturesExecutor());
+ MoreExecutors.directExecutor());
if (!options.usesCache()) {
publisher.startAsync();
}
@@ -126,7 +130,7 @@
onFailure.accept(t);
}
},
- SystemExecutors.getFuturesExecutor());
+ executor);
}
// Intentionally don't flush on bundle finish to allow multi-sink client reuse.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java
index b6a9f5d..9875880 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java
@@ -23,7 +23,6 @@
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
-import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.wire.Committer;
@@ -32,6 +31,8 @@
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -53,11 +54,10 @@
checkArgument(subscriptionPartition.subscription().equals(options.subscriptionPath()));
}
- private Subscriber newSubscriber(
- Partition partition, Offset initialOffset, Consumer<List<SequencedMessage>> consumer) {
+ private Subscriber newSubscriber(Partition partition, Consumer<List<SequencedMessage>> consumer) {
try {
return options
- .getSubscriberFactory(partition, initialOffset)
+ .getSubscriberFactory(partition)
.newSubscriber(
messages ->
consumer.accept(
@@ -71,31 +71,23 @@
private SubscriptionPartitionProcessor newPartitionProcessor(
SubscriptionPartition subscriptionPartition,
- RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
+ RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
OutputReceiver<SequencedMessage> receiver)
throws ApiException {
checkSubscription(subscriptionPartition);
return new SubscriptionPartitionProcessorImpl(
tracker,
receiver,
- consumer ->
- newSubscriber(
- subscriptionPartition.partition(),
- Offset.of(tracker.currentRestriction().getRange().getFrom()),
- consumer),
+ consumer -> newSubscriber(subscriptionPartition.partition(), consumer),
options.flowControlSettings());
}
- private TopicBacklogReader newBacklogReader(SubscriptionPartition subscriptionPartition) {
+ private RestrictionTracker<OffsetRange, OffsetByteProgress> newRestrictionTracker(
+ SubscriptionPartition subscriptionPartition, OffsetRange initial) {
checkSubscription(subscriptionPartition);
- return options.getBacklogReader(subscriptionPartition.partition());
- }
-
- private TrackerWithProgress newRestrictionTracker(
- TopicBacklogReader backlogReader, OffsetByteRange initial) {
return new OffsetByteRangeTracker(
initial,
- backlogReader,
+ options.getBacklogReader(subscriptionPartition.partition()),
Stopwatch.createUnstarted(),
options.minBundleTimeout(),
LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10));
@@ -115,7 +107,7 @@
try (AdminClient admin =
AdminClient.create(
AdminClientSettings.newBuilder()
- .setRegion(options.subscriptionPath().location().extractRegion())
+ .setRegion(options.subscriptionPath().location().region())
.build())) {
return TopicPath.parse(admin.getSubscription(options.subscriptionPath()).get().getTopic());
} catch (Throwable t) {
@@ -126,15 +118,25 @@
@Override
public PCollection<SequencedMessage> expand(PBegin input) {
PCollection<SubscriptionPartition> subscriptionPartitions;
- subscriptionPartitions =
- input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath()));
+ if (options.partitions().isEmpty()) {
+ subscriptionPartitions =
+ input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath()));
+ } else {
+ subscriptionPartitions =
+ input.apply(
+ Create.of(
+ options.partitions().stream()
+ .map(
+ partition ->
+ SubscriptionPartition.of(options.subscriptionPath(), partition))
+ .collect(Collectors.toList())));
+ }
return subscriptionPartitions.apply(
ParDo.of(
new PerSubscriptionPartitionSdf(
// Ensure we read for at least 5 seconds more than the bundle timeout.
options.minBundleTimeout().plus(Duration.standardSeconds(5)),
- new ManagedBacklogReaderFactoryImpl(this::newBacklogReader),
this::newInitialOffsetReader,
this::newRestrictionTracker,
this::newPartitionProcessor,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
index a9625be..0d3afe2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
@@ -23,7 +23,6 @@
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
-import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
@@ -36,13 +35,13 @@
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
-import com.google.cloud.pubsublite.proto.Cursor;
-import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.v1.CursorServiceClient;
import com.google.cloud.pubsublite.v1.CursorServiceSettings;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
import java.io.Serializable;
+import java.util.Set;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
@@ -70,6 +69,11 @@
public abstract FlowControlSettings flowControlSettings();
/**
+ * A set of partitions. If empty, continuously poll the set of partitions using an admin client.
+ */
+ public abstract Set<Partition> partitions();
+
+ /**
* The minimum wall time to pass before allowing bundle closure.
*
* <p>Setting this to too small of a value will result in increased compute costs and lower
@@ -104,6 +108,7 @@
public static Builder newBuilder() {
Builder builder = new AutoValue_SubscriberOptions.Builder();
return builder
+ .setPartitions(ImmutableSet.of())
.setFlowControlSettings(DEFAULT_FLOW_CONTROL)
.setMinBundleTimeout(MIN_BUNDLE_TIMEOUT);
}
@@ -114,19 +119,20 @@
throws ApiException {
try {
SubscriberServiceSettings.Builder settingsBuilder = SubscriberServiceSettings.newBuilder();
+
settingsBuilder =
addDefaultMetadata(
PubsubContext.of(FRAMEWORK),
RoutingMetadata.of(subscriptionPath(), partition),
settingsBuilder);
return SubscriberServiceClient.create(
- addDefaultSettings(subscriptionPath().location().extractRegion(), settingsBuilder));
+ addDefaultSettings(subscriptionPath().location().region(), settingsBuilder));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}
- SubscriberFactory getSubscriberFactory(Partition partition, Offset initialOffset) {
+ SubscriberFactory getSubscriberFactory(Partition partition) {
SubscriberFactory factory = subscriberFactory();
if (factory != null) {
return factory;
@@ -137,10 +143,6 @@
.setSubscriptionPath(subscriptionPath())
.setPartition(partition)
.setServiceClient(newSubscriberServiceClient(partition))
- .setInitialLocation(
- SeekRequest.newBuilder()
- .setCursor(Cursor.newBuilder().setOffset(initialOffset.value()))
- .build())
.build();
}
@@ -148,7 +150,7 @@
try {
return CursorServiceClient.create(
addDefaultSettings(
- subscriptionPath().location().extractRegion(), CursorServiceSettings.newBuilder()));
+ subscriptionPath().location().region(), CursorServiceSettings.newBuilder()));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
@@ -187,7 +189,7 @@
return new InitialOffsetReaderImpl(
CursorClient.create(
CursorClientSettings.newBuilder()
- .setRegion(subscriptionPath().location().extractRegion())
+ .setRegion(subscriptionPath().location().region())
.build()),
subscriptionPath(),
partition);
@@ -199,6 +201,8 @@
public abstract Builder setSubscriptionPath(SubscriptionPath path);
// Optional parameters
+ public abstract Builder setPartitions(Set<Partition> partitions);
+
public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);
public abstract Builder setMinBundleTimeout(Duration minBundleTimeout);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java
index 530c180..6bf3623 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java
@@ -19,6 +19,7 @@
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.io.Serializable;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -27,6 +28,6 @@
SubscriptionPartitionProcessor newProcessor(
SubscriptionPartition subscriptionPartition,
- RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
+ RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
OutputReceiver<SequencedMessage> receiver);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java
index a086d18..8d2a137 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.io.gcp.pubsublite;
-import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
-
import com.google.api.core.ApiService.Listener;
import com.google.api.core.ApiService.State;
import com.google.cloud.pubsublite.Offset;
@@ -26,8 +24,9 @@
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
+import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
+import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.util.Timestamps;
import java.util.List;
@@ -37,17 +36,19 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
import org.joda.time.Duration;
import org.joda.time.Instant;
class SubscriptionPartitionProcessorImpl extends Listener
implements SubscriptionPartitionProcessor {
- private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
+ private final RestrictionTracker<OffsetRange, OffsetByteProgress> tracker;
private final OutputReceiver<SequencedMessage> receiver;
private final Subscriber subscriber;
private final SettableFuture<Void> completionFuture = SettableFuture.create();
@@ -56,7 +57,7 @@
@SuppressWarnings("methodref.receiver.bound.invalid")
SubscriptionPartitionProcessorImpl(
- RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
+ RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
OutputReceiver<SequencedMessage> receiver,
Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
FlowControlSettings flowControlSettings) {
@@ -69,15 +70,23 @@
@Override
@SuppressWarnings("argument.type.incompatible")
public void start() throws CheckedApiException {
- this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor());
+ this.subscriber.addListener(this, MoreExecutors.directExecutor());
this.subscriber.startAsync();
this.subscriber.awaitRunning();
try {
+ this.subscriber
+ .seek(
+ SeekRequest.newBuilder()
+ .setCursor(Cursor.newBuilder().setOffset(tracker.currentRestriction().getFrom()))
+ .build())
+ .get();
this.subscriber.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedBytes(flowControlSettings.bytesOutstanding())
.setAllowedMessages(flowControlSettings.messagesOutstanding())
.build());
+ } catch (ExecutionException e) {
+ throw ExtractStatus.toCanonical(e.getCause());
} catch (Throwable t) {
throw ExtractStatus.toCanonical(t);
}
@@ -116,7 +125,7 @@
@Override
public void close() {
- blockingShutdown(subscriber);
+ subscriber.stopAsync().awaitTerminated();
}
@Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java
index 79db0f1..8c1dd94 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java
@@ -62,7 +62,7 @@
try (AdminClient adminClient =
AdminClient.create(
AdminClientSettings.newBuilder()
- .setRegion(subscriptionPath.location().extractRegion())
+ .setRegion(subscriptionPath.location().region())
.build())) {
return setTopicPath(
TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic()));
@@ -81,9 +81,7 @@
TopicBacklogReader instantiate() throws ApiException {
TopicStatsClientSettings settings =
- TopicStatsClientSettings.newBuilder()
- .setRegion(topicPath().location().extractRegion())
- .build();
+ TopicStatsClientSettings.newBuilder().setRegion(topicPath().location().region()).build();
TopicBacklogReader impl =
new TopicBacklogReaderImpl(TopicStatsClient.create(settings), topicPath(), partition());
return new LimitingTopicBacklogReader(impl, Ticker.systemTicker());
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java
deleted file mode 100644
index 7f0d030..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
-
-public abstract class TrackerWithProgress
- extends RestrictionTracker<OffsetByteRange, OffsetByteProgress> implements HasProgress {}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java
index 5a31f4f..f34ebb6 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java
@@ -49,7 +49,7 @@
private static final double IGNORED_FRACTION = -10000000.0;
private static final long MIN_BYTES = 1000;
private static final OffsetRange RANGE = new OffsetRange(123L, Long.MAX_VALUE);
- private final TopicBacklogReader unownedBacklogReader = mock(TopicBacklogReader.class);
+ private final TopicBacklogReader reader = mock(TopicBacklogReader.class);
@Spy Ticker ticker;
private OffsetByteRangeTracker tracker;
@@ -60,18 +60,14 @@
when(ticker.read()).thenReturn(0L);
tracker =
new OffsetByteRangeTracker(
- OffsetByteRange.of(RANGE, 0),
- unownedBacklogReader,
- Stopwatch.createUnstarted(ticker),
- Duration.millis(500),
- MIN_BYTES);
+ RANGE, reader, Stopwatch.createUnstarted(ticker), Duration.millis(500), MIN_BYTES);
}
@Test
public void progressTracked() {
assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(123), 10)));
assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(124), 11)));
- when(unownedBacklogReader.computeMessageStats(Offset.of(125)))
+ when(reader.computeMessageStats(Offset.of(125)))
.thenReturn(ComputeMessageStatsResponse.newBuilder().setMessageBytes(1000).build());
Progress progress = tracker.getProgress();
assertEquals(21, progress.getWorkCompleted(), .0001);
@@ -80,7 +76,7 @@
@Test
public void getProgressStatsFailure() {
- when(unownedBacklogReader.computeMessageStats(Offset.of(123)))
+ when(reader.computeMessageStats(Offset.of(123)))
.thenThrow(new CheckedApiException(Code.INTERNAL).underlying);
assertThrows(ApiException.class, tracker::getProgress);
}
@@ -90,15 +86,11 @@
public void claimSplitSuccess() {
assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES)));
assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(10_000), MIN_BYTES)));
- SplitResult<OffsetByteRange> splits = tracker.trySplit(IGNORED_FRACTION);
- OffsetByteRange primary = splits.getPrimary();
- assertEquals(RANGE.getFrom(), primary.getRange().getFrom());
- assertEquals(10_001, primary.getRange().getTo());
- assertEquals(MIN_BYTES * 2, primary.getByteCount());
- OffsetByteRange residual = splits.getResidual();
- assertEquals(10_001, residual.getRange().getFrom());
- assertEquals(Long.MAX_VALUE, residual.getRange().getTo());
- assertEquals(0, residual.getByteCount());
+ SplitResult<OffsetRange> splits = tracker.trySplit(IGNORED_FRACTION);
+ assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom());
+ assertEquals(10_001, splits.getPrimary().getTo());
+ assertEquals(10_001, splits.getResidual().getFrom());
+ assertEquals(Long.MAX_VALUE, splits.getResidual().getTo());
assertEquals(splits.getPrimary(), tracker.currentRestriction());
tracker.checkDone();
assertNull(tracker.trySplit(IGNORED_FRACTION));
@@ -108,10 +100,10 @@
@SuppressWarnings({"dereference.of.nullable", "argument.type.incompatible"})
public void splitWithoutClaimEmpty() {
when(ticker.read()).thenReturn(100000000000000L);
- SplitResult<OffsetByteRange> splits = tracker.trySplit(IGNORED_FRACTION);
- assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getFrom());
- assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getTo());
- assertEquals(RANGE, splits.getResidual().getRange());
+ SplitResult<OffsetRange> splits = tracker.trySplit(IGNORED_FRACTION);
+ assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom());
+ assertEquals(RANGE.getFrom(), splits.getPrimary().getTo());
+ assertEquals(RANGE, splits.getResidual());
assertEquals(splits.getPrimary(), tracker.currentRestriction());
tracker.checkDone();
assertNull(tracker.trySplit(IGNORED_FRACTION));
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java
index 0a4e3e7..598037e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java
@@ -28,7 +28,6 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
@@ -52,8 +51,6 @@
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Test;
@@ -68,24 +65,22 @@
public class PerSubscriptionPartitionSdfTest {
private static final Duration MAX_SLEEP_TIME =
Duration.standardMinutes(10).plus(Duration.millis(10));
- private static final OffsetByteRange RESTRICTION =
- OffsetByteRange.of(new OffsetRange(1, Long.MAX_VALUE), 0);
+ private static final OffsetRange RESTRICTION = new OffsetRange(1, Long.MAX_VALUE);
private static final SubscriptionPartition PARTITION =
SubscriptionPartition.of(example(SubscriptionPath.class), example(Partition.class));
@Mock SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory;
- @Mock ManagedBacklogReaderFactory backlogReaderFactory;
- @Mock TopicBacklogReader backlogReader;
-
@Mock
- SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress> trackerFactory;
+ SerializableBiFunction<
+ SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>>
+ trackerFactory;
@Mock SubscriptionPartitionProcessorFactory processorFactory;
@Mock SerializableFunction<SubscriptionPartition, Committer> committerFactory;
@Mock InitialOffsetReader initialOffsetReader;
- @Spy TrackerWithProgress tracker;
+ @Spy RestrictionTracker<OffsetRange, OffsetByteProgress> tracker;
@Mock OutputReceiver<SequencedMessage> output;
@Mock SubscriptionPartitionProcessor processor;
@@ -103,11 +98,9 @@
when(trackerFactory.apply(any(), any())).thenReturn(tracker);
when(committerFactory.apply(any())).thenReturn(committer);
when(tracker.currentRestriction()).thenReturn(RESTRICTION);
- when(backlogReaderFactory.newReader(any())).thenReturn(backlogReader);
sdf =
new PerSubscriptionPartitionSdf(
MAX_SLEEP_TIME,
- backlogReaderFactory,
offsetReaderFactory,
trackerFactory,
processorFactory,
@@ -117,10 +110,9 @@
@Test
public void getInitialRestrictionReadSuccess() {
when(initialOffsetReader.read()).thenReturn(example(Offset.class));
- OffsetByteRange range = sdf.getInitialRestriction(PARTITION);
- assertEquals(example(Offset.class).value(), range.getRange().getFrom());
- assertEquals(Long.MAX_VALUE, range.getRange().getTo());
- assertEquals(0, range.getByteCount());
+ OffsetRange range = sdf.getInitialRestriction(PARTITION);
+ assertEquals(example(Offset.class).value(), range.getFrom());
+ assertEquals(Long.MAX_VALUE, range.getTo());
verify(offsetReaderFactory).apply(PARTITION);
}
@@ -133,13 +125,7 @@
@Test
public void newTrackerCallsFactory() {
assertSame(tracker, sdf.newTracker(PARTITION, RESTRICTION));
- verify(trackerFactory).apply(backlogReader, RESTRICTION);
- }
-
- @Test
- public void tearDownClosesBacklogReaderFactory() {
- sdf.teardown();
- verify(backlogReaderFactory).close();
+ verify(trackerFactory).apply(PARTITION, RESTRICTION);
}
@Test
@@ -173,48 +159,12 @@
order2.verify(committer).awaitTerminated();
}
- private static final class NoopManagedBacklogReaderFactory
- implements ManagedBacklogReaderFactory {
- @Override
- public TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) {
- return null;
- }
-
- @Override
- public void close() {}
- }
-
@Test
@SuppressWarnings("return.type.incompatible")
public void dofnIsSerializable() throws Exception {
ObjectOutputStream output = new ObjectOutputStream(new ByteArrayOutputStream());
output.writeObject(
new PerSubscriptionPartitionSdf(
- MAX_SLEEP_TIME,
- new NoopManagedBacklogReaderFactory(),
- x -> null,
- (x, y) -> null,
- (x, y, z) -> null,
- (x) -> null));
- }
-
- @Test
- public void getProgressUnboundedRangeDelegates() {
- Progress progress = Progress.from(0, 0.2);
- when(tracker.getProgress()).thenReturn(progress);
- assertTrue(
- DoubleMath.fuzzyEquals(
- progress.getWorkRemaining(), sdf.getSize(PARTITION, RESTRICTION), .0001));
- verify(tracker).getProgress();
- }
-
- @Test
- public void getProgressBoundedReturnsBytes() {
- assertTrue(
- DoubleMath.fuzzyEquals(
- 123.0,
- sdf.getSize(PARTITION, OffsetByteRange.of(new OffsetRange(87, 8000), 123)),
- .0001));
- verifyNoInteractions(tracker);
+ MAX_SLEEP_TIME, x -> null, (x, y) -> null, (x, y, z) -> null, (x) -> null));
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
deleted file mode 100644
index e242942..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-import static org.junit.Assert.fail;
-
-import com.google.cloud.pubsublite.AdminClient;
-import com.google.cloud.pubsublite.AdminClientSettings;
-import com.google.cloud.pubsublite.BacklogLocation;
-import com.google.cloud.pubsublite.CloudZone;
-import com.google.cloud.pubsublite.Message;
-import com.google.cloud.pubsublite.ProjectId;
-import com.google.cloud.pubsublite.SubscriptionName;
-import com.google.cloud.pubsublite.SubscriptionPath;
-import com.google.cloud.pubsublite.TopicName;
-import com.google.cloud.pubsublite.TopicPath;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.cloud.pubsublite.proto.SequencedMessage;
-import com.google.cloud.pubsublite.proto.Subscription;
-import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement;
-import com.google.cloud.pubsublite.proto.Topic;
-import com.google.cloud.pubsublite.proto.Topic.PartitionConfig.Capacity;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
-import com.google.protobuf.ByteString;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.FlatMapElements;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.joda.time.Duration;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(JUnit4.class)
-public class ReadWriteIT {
- private static final Logger LOG = LoggerFactory.getLogger(ReadWriteIT.class);
- private static final CloudZone ZONE = CloudZone.parse("us-central1-b");
- private static final int MESSAGE_COUNT = 90;
-
- @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
- private static ProjectId getProject(PipelineOptions options) {
- return ProjectId.of(checkArgumentNotNull(options.as(GcpOptions.class).getProject()));
- }
-
- private static String randomName() {
- return "beam_it_resource_" + ThreadLocalRandom.current().nextLong();
- }
-
- private static AdminClient newAdminClient() {
- return AdminClient.create(AdminClientSettings.newBuilder().setRegion(ZONE.region()).build());
- }
-
- private final Deque<Runnable> cleanupActions = new ArrayDeque<>();
-
- private TopicPath createTopic(ProjectId id) throws Exception {
- TopicPath toReturn =
- TopicPath.newBuilder()
- .setProject(id)
- .setLocation(ZONE)
- .setName(TopicName.of(randomName()))
- .build();
- Topic.Builder topic = Topic.newBuilder().setName(toReturn.toString());
- topic
- .getPartitionConfigBuilder()
- .setCount(2)
- .setCapacity(Capacity.newBuilder().setPublishMibPerSec(4).setSubscribeMibPerSec(4));
- topic.getRetentionConfigBuilder().setPerPartitionBytes(30 * (1L << 30));
- cleanupActions.addLast(
- () -> {
- try (AdminClient client = newAdminClient()) {
- client.deleteTopic(toReturn).get();
- } catch (Throwable t) {
- LOG.error("Failed to clean up topic.", t);
- }
- });
- try (AdminClient client = newAdminClient()) {
- client.createTopic(topic.build()).get();
- }
- return toReturn;
- }
-
- private SubscriptionPath createSubscription(TopicPath topic) throws Exception {
- SubscriptionPath toReturn =
- SubscriptionPath.newBuilder()
- .setProject(topic.project())
- .setLocation(ZONE)
- .setName(SubscriptionName.of(randomName()))
- .build();
- Subscription.Builder subscription = Subscription.newBuilder().setName(toReturn.toString());
- subscription
- .getDeliveryConfigBuilder()
- .setDeliveryRequirement(DeliveryRequirement.DELIVER_IMMEDIATELY);
- subscription.setTopic(topic.toString());
- cleanupActions.addLast(
- () -> {
- try (AdminClient client = newAdminClient()) {
- client.deleteSubscription(toReturn).get();
- } catch (Throwable t) {
- LOG.error("Failed to clean up subscription.", t);
- }
- });
- try (AdminClient client = newAdminClient()) {
- client.createSubscription(subscription.build(), BacklogLocation.BEGINNING).get();
- }
- return toReturn;
- }
-
- @After
- public void tearDown() {
- while (!cleanupActions.isEmpty()) {
- cleanupActions.removeLast().run();
- }
- }
-
- // Workaround for BEAM-12867
- // TODO(BEAM-12867): Remove this.
- private static class CustomCreate extends PTransform<PCollection<Void>, PCollection<Integer>> {
- @Override
- public PCollection<Integer> expand(PCollection<Void> input) {
- return input.apply(
- "createIndexes",
- FlatMapElements.via(
- new SimpleFunction<Void, Iterable<Integer>>() {
- @Override
- public Iterable<Integer> apply(Void input) {
- return IntStream.range(0, MESSAGE_COUNT).boxed().collect(Collectors.toList());
- }
- }));
- }
- }
-
- public static void writeMessages(TopicPath topicPath, Pipeline pipeline) {
- PCollection<Void> trigger = pipeline.apply(Create.of((Void) null));
- PCollection<Integer> indexes = trigger.apply("createIndexes", new CustomCreate());
- PCollection<PubSubMessage> messages =
- indexes.apply(
- "createMessages",
- MapElements.via(
- new SimpleFunction<Integer, PubSubMessage>(
- index ->
- Message.builder()
- .setData(ByteString.copyFromUtf8(index.toString()))
- .build()
- .toProto()) {}));
- // Add UUIDs to messages for later deduplication.
- messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
- messages.apply(
- "writeMessages",
- PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
- }
-
- public static PCollection<SequencedMessage> readMessages(
- SubscriptionPath subscriptionPath, Pipeline pipeline) {
- PCollection<SequencedMessage> messages =
- pipeline.apply(
- "readMessages",
- PubsubLiteIO.read(
- SubscriberOptions.newBuilder()
- .setSubscriptionPath(subscriptionPath)
- // setMinBundleTimeout INTENDED FOR TESTING ONLY
- // This sacrifices efficiency to make tests run faster. Do not use this in a
- // real pipeline!
- .setMinBundleTimeout(Duration.standardSeconds(5))
- .build()));
- // Deduplicate messages based on the uuids added in PubsubLiteIO.addUuids() when writing.
- return messages.apply(
- "dedupeMessages", PubsubLiteIO.deduplicate(UuidDeduplicationOptions.newBuilder().build()));
- }
-
- // This static out of band communication is needed to retain serializability.
- @GuardedBy("ReadWriteIT.class")
- private static final List<SequencedMessage> received = new ArrayList<>();
-
- private static synchronized void addMessageReceived(SequencedMessage message) {
- received.add(message);
- }
-
- private static synchronized List<SequencedMessage> getTestQuickstartReceived() {
- return ImmutableList.copyOf(received);
- }
-
- private static PTransform<PCollection<? extends SequencedMessage>, PCollection<Void>>
- collectTestQuickstart() {
- return MapElements.via(
- new SimpleFunction<SequencedMessage, Void>() {
- @Override
- public Void apply(SequencedMessage input) {
- addMessageReceived(input);
- return null;
- }
- });
- }
-
- @Test
- public void testReadWrite() throws Exception {
- pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
- pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
-
- TopicPath topic = createTopic(getProject(pipeline.getOptions()));
- SubscriptionPath subscription = createSubscription(topic);
-
- // Publish some messages
- writeMessages(topic, pipeline);
-
- // Read some messages. They should be deduplicated by the time we see them, so there should be
- // exactly numMessages, one for every index in [0,MESSAGE_COUNT).
- PCollection<SequencedMessage> messages = readMessages(subscription, pipeline);
- messages.apply("messageReceiver", collectTestQuickstart());
- pipeline.run();
- LOG.info("Running!");
- for (int round = 0; round < 120; ++round) {
- Thread.sleep(1000);
- Map<Integer, Integer> receivedCounts = new HashMap<>();
- for (SequencedMessage message : getTestQuickstartReceived()) {
- int id = Integer.parseInt(message.getMessage().getData().toStringUtf8());
- receivedCounts.put(id, receivedCounts.getOrDefault(id, 0) + 1);
- }
- LOG.info("Performing comparison round {}.\n", round);
- boolean done = true;
- List<Integer> missing = new ArrayList<>();
- for (int id = 0; id < MESSAGE_COUNT; id++) {
- int idCount = receivedCounts.getOrDefault(id, 0);
- if (idCount == 0) {
- missing.add(id);
- done = false;
- }
- if (idCount > 1) {
- fail(String.format("Failed to deduplicate message with id %s.", id));
- }
- }
- LOG.info("Still messing messages: {}.\n", missing);
- if (done) {
- return;
- }
- }
- fail(
- String.format(
- "Failed to receive all messages after 2 minutes. Received %s messages.",
- getTestQuickstartReceived().size()));
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java
index 3d743758..dbf3b93 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java
@@ -31,6 +31,7 @@
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
+import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.Offset;
@@ -39,6 +40,7 @@
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
+import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.util.Timestamps;
import java.util.List;
@@ -62,7 +64,7 @@
@RunWith(JUnit4.class)
@SuppressWarnings("initialization.fields.uninitialized")
public class SubscriptionPartitionProcessorImplTest {
- @Spy RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
+ @Spy RestrictionTracker<OffsetRange, OffsetByteProgress> tracker;
@Mock OutputReceiver<SequencedMessage> receiver;
@Mock Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory;
@@ -81,10 +83,6 @@
.build();
}
- private OffsetByteRange initialRange() {
- return OffsetByteRange.of(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
- }
-
@Before
public void setUp() {
initMocks(this);
@@ -102,11 +100,18 @@
@Test
public void lifecycle() throws Exception {
- when(tracker.currentRestriction()).thenReturn(initialRange());
+ when(tracker.currentRestriction())
+ .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+ when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
processor.start();
verify(subscriber).startAsync();
verify(subscriber).awaitRunning();
verify(subscriber)
+ .seek(
+ SeekRequest.newBuilder()
+ .setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value()))
+ .build());
+ verify(subscriber)
.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedBytes(DEFAULT_FLOW_CONTROL.bytesOutstanding())
@@ -118,15 +123,29 @@
}
@Test
- public void lifecycleFlowControlThrows() throws Exception {
- when(tracker.currentRestriction()).thenReturn(initialRange());
+ public void lifecycleSeekThrows() throws Exception {
+ when(tracker.currentRestriction())
+ .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+ when(subscriber.seek(any()))
+ .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE)));
doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any());
assertThrows(CheckedApiException.class, () -> processor.start());
}
@Test
+ public void lifecycleFlowControlThrows() {
+ when(tracker.currentRestriction())
+ .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+ when(subscriber.seek(any()))
+ .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE)));
+ assertThrows(CheckedApiException.class, () -> processor.start());
+ }
+
+ @Test
public void lifecycleSubscriberAwaitThrows() throws Exception {
- when(tracker.currentRestriction()).thenReturn(initialRange());
+ when(tracker.currentRestriction())
+ .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+ when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
processor.start();
doThrow(new CheckedApiException(Code.INTERNAL).underlying).when(subscriber).awaitTerminated();
assertThrows(ApiException.class, () -> processor.close());
@@ -136,19 +155,21 @@
@Test
public void subscriberFailureFails() throws Exception {
- when(tracker.currentRestriction()).thenReturn(initialRange());
+ when(tracker.currentRestriction())
+ .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+ when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
processor.start();
subscriber.fail(new CheckedApiException(Code.OUT_OF_RANGE));
ApiException e =
- assertThrows(
- // Longer wait is needed due to listener asynchrony.
- ApiException.class, () -> processor.waitForCompletion(Duration.standardSeconds(1)));
+ assertThrows(ApiException.class, () -> processor.waitForCompletion(Duration.ZERO));
assertEquals(Code.OUT_OF_RANGE, e.getStatusCode().getCode());
}
@Test
public void allowFlowFailureFails() throws Exception {
- when(tracker.currentRestriction()).thenReturn(initialRange());
+ when(tracker.currentRestriction())
+ .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE));
+ when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class)));
processor.start();
when(tracker.tryClaim(any())).thenReturn(true);
doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any());