blob: 6b20b56b0b3c5f209dcd71330a93a580bee27e8f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import com.google.api.client.util.Clock;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
/**
* A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
* testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
* methods. Relies on statics to mimic the Pubsub service, though we try to hide that.
*/
@Experimental
public class PubsubTestClient extends PubsubClient implements Serializable {
/**
* Mimic the state of the simulated Pubsub 'service'.
*
* <p>Note that the {@link PubsubTestClientFactory} is serialized/deserialized even when running
* test pipelines. Meanwhile it is valid for multiple {@link PubsubTestClient}s to be created from
* the same client factory and run in parallel. Thus we can't enforce aliasing of the following
* data structures over all clients and must resort to a static.
*/
private static class State {
/** True if has been primed for a test but not yet validated. */
boolean isActive;
/** Publish mode only: Only publish calls for this topic are allowed. */
@Nullable TopicPath expectedTopic;
/** Publish mode only: Messages yet to seen in a {@link #publish} call. */
@Nullable Set<OutgoingMessage> remainingExpectedOutgoingMessages;
/**
* Publish mode only: Messages which should throw when first sent to simulate transient publish
* failure.
*/
@Nullable Set<OutgoingMessage> remainingFailingOutgoingMessages;
/** Pull mode only: Clock from which to get current time. */
@Nullable Clock clock;
/** Pull mode only: Only pull calls for this subscription are allowed. */
@Nullable SubscriptionPath expectedSubscription;
/** Pull mode only: Timeout to simulate. */
int ackTimeoutSec;
/** Pull mode only: Messages waiting to be received by a {@link #pull} call. */
@Nullable List<IncomingMessage> remainingPendingIncomingMessages;
/**
* Pull mode only: Messages which have been returned from a {@link #pull} call and not yet ACKed
* by an {@link #acknowledge} call.
*/
@Nullable Map<String, IncomingMessage> pendingAckIncomingMessages;
/** Pull mode only: When above messages are due to have their ACK deadlines expire. */
@Nullable Map<String, Long> ackDeadline;
}
private static final State STATE = new State();
/** Closing the factory will validate all expected messages were processed. */
public interface PubsubTestClientFactory extends PubsubClientFactory, Closeable, Serializable {}
/**
* Return a factory for testing publishers. Only one factory may be in-flight at a time. The
* factory must be closed when the test is complete, at which point final validation will occur.
*/
public static PubsubTestClientFactory createFactoryForPublish(
final TopicPath expectedTopic,
final Iterable<OutgoingMessage> expectedOutgoingMessages,
final Iterable<OutgoingMessage> failingOutgoingMessages) {
synchronized (STATE) {
checkState(!STATE.isActive, "Test still in flight");
STATE.expectedTopic = expectedTopic;
STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages);
STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages);
STATE.isActive = true;
}
return new PubsubTestClientFactory() {
@Override
public PubsubClient newClient(
@Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
throws IOException {
return new PubsubTestClient();
}
@Override
public String getKind() {
return "PublishTest";
}
@Override
public void close() {
synchronized (STATE) {
checkState(STATE.isActive, "No test still in flight");
checkState(
STATE.remainingExpectedOutgoingMessages.isEmpty(),
"Still waiting for %s messages to be published",
STATE.remainingExpectedOutgoingMessages.size());
STATE.isActive = false;
STATE.remainingExpectedOutgoingMessages = null;
}
}
};
}
/**
* Return a factory for testing subscribers. Only one factory may be in-flight at a time. The
* factory must be closed when the test in complete
*/
public static PubsubTestClientFactory createFactoryForPull(
final Clock clock,
final SubscriptionPath expectedSubscription,
final int ackTimeoutSec,
final Iterable<IncomingMessage> expectedIncomingMessages) {
synchronized (STATE) {
checkState(!STATE.isActive, "Test still in flight");
STATE.clock = clock;
STATE.expectedSubscription = expectedSubscription;
STATE.ackTimeoutSec = ackTimeoutSec;
STATE.remainingPendingIncomingMessages = Lists.newArrayList(expectedIncomingMessages);
STATE.pendingAckIncomingMessages = new HashMap<>();
STATE.ackDeadline = new HashMap<>();
STATE.isActive = true;
}
return new PubsubTestClientFactory() {
@Override
public PubsubClient newClient(
@Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
throws IOException {
return new PubsubTestClient();
}
@Override
public String getKind() {
return "PullTest";
}
@Override
public void close() {
synchronized (STATE) {
checkState(STATE.isActive, "No test still in flight");
checkState(
STATE.remainingPendingIncomingMessages.isEmpty(),
"Still waiting for %s messages to be pulled",
STATE.remainingPendingIncomingMessages.size());
checkState(
STATE.pendingAckIncomingMessages.isEmpty(),
"Still waiting for %s messages to be ACKed",
STATE.pendingAckIncomingMessages.size());
checkState(
STATE.ackDeadline.isEmpty(),
"Still waiting for %s messages to be ACKed",
STATE.ackDeadline.size());
STATE.isActive = false;
STATE.remainingPendingIncomingMessages = null;
STATE.pendingAckIncomingMessages = null;
STATE.ackDeadline = null;
}
}
};
}
public static PubsubTestClientFactory createFactoryForCreateSubscription() {
return new PubsubTestClientFactory() {
int numCalls = 0;
@Override
public void close() throws IOException {
checkState(
numCalls == 1, "Expected exactly one subscription to be created, got %s", numCalls);
}
@Override
public PubsubClient newClient(
@Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
throws IOException {
return new PubsubTestClient() {
@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds)
throws IOException {
checkState(numCalls == 0, "Expected at most one subscription to be created");
numCalls++;
}
};
}
@Override
public String getKind() {
return "CreateSubscriptionTest";
}
};
}
/** Return true if in pull mode. */
private boolean inPullMode() {
checkState(STATE.isActive, "No test is active");
return STATE.expectedSubscription != null;
}
/** Return true if in publish mode. */
private boolean inPublishMode() {
checkState(STATE.isActive, "No test is active");
return STATE.expectedTopic != null;
}
/**
* For subscription mode only: Track progression of time according to the {@link Clock} passed .
* This will simulate Pubsub expiring outstanding ACKs.
*/
public void advance() {
synchronized (STATE) {
checkState(inPullMode(), "Can only advance in pull mode");
// Any messages who's ACKs timed out are available for re-pulling.
Iterator<Map.Entry<String, Long>> deadlineItr = STATE.ackDeadline.entrySet().iterator();
while (deadlineItr.hasNext()) {
Map.Entry<String, Long> entry = deadlineItr.next();
if (entry.getValue() <= STATE.clock.currentTimeMillis()) {
STATE.remainingPendingIncomingMessages.add(
STATE.pendingAckIncomingMessages.remove(entry.getKey()));
deadlineItr.remove();
}
}
}
}
@Override
public void close() {}
@Override
public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
synchronized (STATE) {
checkState(inPublishMode(), "Can only publish in publish mode");
checkState(
topic.equals(STATE.expectedTopic),
"Topic %s does not match expected %s",
topic,
STATE.expectedTopic);
for (OutgoingMessage outgoingMessage : outgoingMessages) {
if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) {
throw new RuntimeException("Simulating failure for " + outgoingMessage);
}
checkState(
STATE.remainingExpectedOutgoingMessages.remove(outgoingMessage),
"Unexpected outgoing message %s",
outgoingMessage);
}
return outgoingMessages.size();
}
}
@Override
public List<IncomingMessage> pull(
long requestTimeMsSinceEpoch,
SubscriptionPath subscription,
int batchSize,
boolean returnImmediately)
throws IOException {
synchronized (STATE) {
checkState(inPullMode(), "Can only pull in pull mode");
long now = STATE.clock.currentTimeMillis();
checkState(
requestTimeMsSinceEpoch == now,
"Simulated time %s does not match request time %s",
now,
requestTimeMsSinceEpoch);
checkState(
subscription.equals(STATE.expectedSubscription),
"Subscription %s does not match expected %s",
subscription,
STATE.expectedSubscription);
checkState(returnImmediately, "Pull only supported if returning immediately");
List<IncomingMessage> incomingMessages = new ArrayList<>();
Iterator<IncomingMessage> pendItr = STATE.remainingPendingIncomingMessages.iterator();
while (pendItr.hasNext()) {
IncomingMessage incomingMessage = pendItr.next();
pendItr.remove();
IncomingMessage incomingMessageWithRequestTime =
incomingMessage.withRequestTime(requestTimeMsSinceEpoch);
incomingMessages.add(incomingMessageWithRequestTime);
STATE.pendingAckIncomingMessages.put(
incomingMessageWithRequestTime.ackId, incomingMessageWithRequestTime);
STATE.ackDeadline.put(
incomingMessageWithRequestTime.ackId,
requestTimeMsSinceEpoch + STATE.ackTimeoutSec * 1000);
if (incomingMessages.size() >= batchSize) {
break;
}
}
return incomingMessages;
}
}
@Override
public void acknowledge(SubscriptionPath subscription, List<String> ackIds) throws IOException {
synchronized (STATE) {
checkState(inPullMode(), "Can only acknowledge in pull mode");
checkState(
subscription.equals(STATE.expectedSubscription),
"Subscription %s does not match expected %s",
subscription,
STATE.expectedSubscription);
for (String ackId : ackIds) {
checkState(
STATE.ackDeadline.remove(ackId) != null,
"No message with ACK id %s is waiting for an ACK",
ackId);
checkState(
STATE.pendingAckIncomingMessages.remove(ackId) != null,
"No message with ACK id %s is waiting for an ACK",
ackId);
}
}
}
@Override
public void modifyAckDeadline(
SubscriptionPath subscription, List<String> ackIds, int deadlineSeconds) throws IOException {
synchronized (STATE) {
checkState(inPullMode(), "Can only modify ack deadline in pull mode");
checkState(
subscription.equals(STATE.expectedSubscription),
"Subscription %s does not match expected %s",
subscription,
STATE.expectedSubscription);
for (String ackId : ackIds) {
if (deadlineSeconds > 0) {
checkState(
STATE.ackDeadline.remove(ackId) != null,
"No message with ACK id %s is waiting for an ACK",
ackId);
checkState(
STATE.pendingAckIncomingMessages.containsKey(ackId),
"No message with ACK id %s is waiting for an ACK",
ackId);
STATE.ackDeadline.put(ackId, STATE.clock.currentTimeMillis() + deadlineSeconds * 1000);
} else {
checkState(
STATE.ackDeadline.remove(ackId) != null,
"No message with ACK id %s is waiting for an ACK",
ackId);
IncomingMessage message = STATE.pendingAckIncomingMessages.remove(ackId);
checkState(message != null, "No message with ACK id %s is waiting for an ACK", ackId);
STATE.remainingPendingIncomingMessages.add(message);
}
}
}
}
@Override
public void createTopic(TopicPath topic) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void deleteTopic(TopicPath topic) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public List<TopicPath> listTopics(ProjectPath project) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void deleteSubscription(SubscriptionPath subscription) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public List<SubscriptionPath> listSubscriptions(ProjectPath project, TopicPath topic)
throws IOException {
throw new UnsupportedOperationException();
}
@Override
public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException {
synchronized (STATE) {
return STATE.ackTimeoutSec;
}
}
@Override
public boolean isEOF() {
synchronized (STATE) {
checkState(inPullMode(), "Can only check EOF in pull mode");
return STATE.remainingPendingIncomingMessages.isEmpty();
}
}
}