blob: fd0f0174738fbf1fc571adbc3c04fbe864c9c01a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;
import static junit.framework.TestCase.assertFalse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import com.google.api.client.util.Clock;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubCheckpoint;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubReader;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Test PubsubUnboundedSource. */
@RunWith(JUnit4.class)
public class PubsubUnboundedSourceTest {
private static final SubscriptionPath SUBSCRIPTION =
PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
private static final String DATA = "testData";
private static final long TIMESTAMP = 1234L;
private static final long REQ_TIME = 6373L;
private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
private static final String ID_ATTRIBUTE = "id";
private static final String ACK_ID = "testAckId";
private static final String RECORD_ID = "testRecordId";
private static final int ACK_TIMEOUT_S = 60;
private AtomicLong now;
private Clock clock;
private PubsubTestClientFactory factory;
private PubsubSource primSource;
@Rule public TestPipeline p = TestPipeline.create();
private void setupOneMessage(Iterable<IncomingMessage> incoming) {
now = new AtomicLong(REQ_TIME);
clock = () -> now.get();
factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming);
PubsubUnboundedSource source =
new PubsubUnboundedSource(
clock,
factory,
null,
null,
StaticValueProvider.of(SUBSCRIPTION),
TIMESTAMP_ATTRIBUTE,
ID_ATTRIBUTE,
true /* needsAttributes */);
primSource = new PubsubSource(source);
}
private void setupOneMessage() {
setupOneMessage(
ImmutableList.of(
new IncomingMessage(
DATA.getBytes(StandardCharsets.UTF_8), null, TIMESTAMP, 0, ACK_ID, RECORD_ID)));
}
@After
public void after() throws IOException {
factory.close();
now = null;
clock = null;
primSource = null;
factory = null;
}
private static String data(PubsubMessage message) {
return new String(message.getPayload(), StandardCharsets.UTF_8);
}
@Test
public void checkpointCoderIsSane() {
setupOneMessage(ImmutableList.of());
CoderProperties.coderSerializable(primSource.getCheckpointMarkCoder());
// Since we only serialize/deserialize the 'notYetReadIds', and we don't want to make
// equals on checkpoints ignore those fields, we'll test serialization and deserialization
// of checkpoints in multipleReaders below.
}
@Test
public void readOneMessage() throws IOException {
setupOneMessage();
PubsubReader reader = primSource.createReader(p.getOptions(), null);
// Read one message.
assertTrue(reader.start());
assertEquals(DATA, data(reader.getCurrent()));
assertFalse(reader.advance());
// ACK the message.
PubsubCheckpoint checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader.close();
}
@Test
public void timeoutAckAndRereadOneMessage() throws IOException {
setupOneMessage();
PubsubReader reader = primSource.createReader(p.getOptions(), null);
PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
assertTrue(reader.start());
assertEquals(DATA, data(reader.getCurrent()));
// Let the ACK deadline for the above expire.
now.addAndGet(65 * 1000);
pubsubClient.advance();
// We'll now receive the same message again.
assertTrue(reader.advance());
assertEquals(DATA, data(reader.getCurrent()));
assertFalse(reader.advance());
// Now ACK the message.
PubsubCheckpoint checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader.close();
}
@Test
public void extendAck() throws IOException {
setupOneMessage();
PubsubReader reader = primSource.createReader(p.getOptions(), null);
PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
// Pull the first message but don't take a checkpoint for it.
assertTrue(reader.start());
assertEquals(DATA, data(reader.getCurrent()));
// Extend the ack
now.addAndGet(55 * 1000);
pubsubClient.advance();
assertFalse(reader.advance());
// Extend the ack again
now.addAndGet(25 * 1000);
pubsubClient.advance();
assertFalse(reader.advance());
// Now ACK the message.
PubsubCheckpoint checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader.close();
}
@Test
public void timeoutAckExtensions() throws IOException {
setupOneMessage();
PubsubReader reader = primSource.createReader(p.getOptions(), null);
PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
// Pull the first message but don't take a checkpoint for it.
assertTrue(reader.start());
assertEquals(DATA, data(reader.getCurrent()));
// Extend the ack.
now.addAndGet(55 * 1000);
pubsubClient.advance();
assertFalse(reader.advance());
// Let the ack expire.
for (int i = 0; i < 3; i++) {
now.addAndGet(25 * 1000);
pubsubClient.advance();
assertFalse(reader.advance());
}
// Wait for resend.
now.addAndGet(25 * 1000);
pubsubClient.advance();
// Reread the same message.
assertTrue(reader.advance());
assertEquals(DATA, data(reader.getCurrent()));
// Now ACK the message.
PubsubCheckpoint checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader.close();
}
@Test
public void multipleReaders() throws IOException {
List<IncomingMessage> incoming = new ArrayList<>();
for (int i = 0; i < 2; i++) {
String data = String.format("data_%d", i);
String ackid = String.format("ackid_%d", i);
incoming.add(
new IncomingMessage(
data.getBytes(StandardCharsets.UTF_8), null, TIMESTAMP, 0, ackid, RECORD_ID));
}
setupOneMessage(incoming);
PubsubReader reader = primSource.createReader(p.getOptions(), null);
// Consume two messages, only read one.
assertTrue(reader.start());
assertEquals("data_0", data(reader.getCurrent()));
// Grab checkpoint.
PubsubCheckpoint checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
assertEquals(1, checkpoint.notYetReadIds.size());
assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
// Read second message.
assertTrue(reader.advance());
assertEquals("data_1", data(reader.getCurrent()));
// Restore from checkpoint.
byte[] checkpointBytes =
CoderUtils.encodeToByteArray(primSource.getCheckpointMarkCoder(), checkpoint);
checkpoint =
CoderUtils.decodeFromByteArray(primSource.getCheckpointMarkCoder(), checkpointBytes);
assertEquals(1, checkpoint.notYetReadIds.size());
assertEquals("ackid_1", checkpoint.notYetReadIds.get(0));
// Re-read second message.
reader = primSource.createReader(p.getOptions(), checkpoint);
assertTrue(reader.start());
assertEquals("data_1", data(reader.getCurrent()));
// We are done.
assertFalse(reader.advance());
// ACK final message.
checkpoint = reader.getCheckpointMark();
checkpoint.finalizeCheckpoint();
reader.close();
}
private long messageNumToTimestamp(int messageNum) {
return TIMESTAMP + messageNum * 100;
}
@Test
public void readManyMessages() throws IOException {
Map<String, Integer> dataToMessageNum = new HashMap<>();
final int m = 97;
final int n = 10000;
List<IncomingMessage> incoming = new ArrayList<>();
for (int i = 0; i < n; i++) {
// Make the messages timestamps slightly out of order.
int messageNum = ((i / m) * m) + (m - 1) - (i % m);
String data = String.format("data_%d", messageNum);
dataToMessageNum.put(data, messageNum);
String recid = String.format("recordid_%d", messageNum);
String ackId = String.format("ackid_%d", messageNum);
incoming.add(
new IncomingMessage(
data.getBytes(StandardCharsets.UTF_8),
null,
messageNumToTimestamp(messageNum),
0,
ackId,
recid));
}
setupOneMessage(incoming);
PubsubReader reader = primSource.createReader(p.getOptions(), null);
PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient();
for (int i = 0; i < n; i++) {
if (i == 0) {
assertTrue(reader.start());
} else {
assertTrue(reader.advance());
}
// We'll checkpoint and ack within the 2min limit.
now.addAndGet(30);
pubsubClient.advance();
String data = data(reader.getCurrent());
Integer messageNum = dataToMessageNum.remove(data);
// No duplicate messages.
assertNotNull(messageNum);
// Preserve timestamp.
assertEquals(new Instant(messageNumToTimestamp(messageNum)), reader.getCurrentTimestamp());
// Preserve record id.
String recid = String.format("recordid_%d", messageNum);
assertArrayEquals(recid.getBytes(StandardCharsets.UTF_8), reader.getCurrentRecordId());
if (i % 1000 == 999) {
// Estimated watermark can never get ahead of actual outstanding messages.
long watermark = reader.getWatermark().getMillis();
long minOutstandingTimestamp = Long.MAX_VALUE;
for (Integer outstandingMessageNum : dataToMessageNum.values()) {
minOutstandingTimestamp =
Math.min(minOutstandingTimestamp, messageNumToTimestamp(outstandingMessageNum));
}
assertThat(watermark, lessThanOrEqualTo(minOutstandingTimestamp));
// Ack messages, but only every other finalization.
PubsubCheckpoint checkpoint = reader.getCheckpointMark();
if (i % 2000 == 1999) {
checkpoint.finalizeCheckpoint();
}
}
}
// We are done.
assertFalse(reader.advance());
// We saw each message exactly once.
assertTrue(dataToMessageNum.isEmpty());
reader.close();
}
@Test
public void noSubscriptionSplitGeneratesSubscription() throws Exception {
TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
factory = PubsubTestClient.createFactoryForCreateSubscription();
PubsubUnboundedSource source =
new PubsubUnboundedSource(
factory,
StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
StaticValueProvider.of(topicPath),
null /* subscription */,
null /* timestampLabel */,
null /* idLabel */,
false /* needsAttributes */);
assertThat(source.getSubscription(), nullValue());
assertThat(source.getSubscription(), nullValue());
PipelineOptions options = PipelineOptionsFactory.create();
List<PubsubSource> splits = (new PubsubSource(source)).split(3, options);
// We have at least one returned split
assertThat(splits, hasSize(greaterThan(0)));
for (PubsubSource split : splits) {
// Each split is equal
assertThat(split, equalTo(splits.get(0)));
}
assertThat(splits.get(0).subscriptionPath, not(nullValue()));
}
@Test
public void noSubscriptionNoSplitGeneratesSubscription() throws Exception {
TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
factory = PubsubTestClient.createFactoryForCreateSubscription();
PubsubUnboundedSource source =
new PubsubUnboundedSource(
factory,
StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")),
StaticValueProvider.of(topicPath),
null /* subscription */,
null /* timestampLabel */,
null /* idLabel */,
false /* needsAttributes */);
assertThat(source.getSubscription(), nullValue());
assertThat(source.getSubscription(), nullValue());
PipelineOptions options = PipelineOptionsFactory.create();
PubsubSource actualSource = new PubsubSource(source);
PubsubReader reader = actualSource.createReader(options, null);
SubscriptionPath createdSubscription = reader.subscription;
assertThat(createdSubscription, not(nullValue()));
PubsubCheckpoint checkpoint = reader.getCheckpointMark();
assertThat(checkpoint.subscriptionPath, equalTo(createdSubscription.getPath()));
checkpoint.finalizeCheckpoint();
PubsubCheckpoint deserCheckpoint =
CoderUtils.clone(actualSource.getCheckpointMarkCoder(), checkpoint);
assertThat(checkpoint.subscriptionPath, not(nullValue()));
assertThat(checkpoint.subscriptionPath, equalTo(deserCheckpoint.subscriptionPath));
PubsubReader readerFromOriginal = actualSource.createReader(options, checkpoint);
PubsubReader readerFromDeser = actualSource.createReader(options, deserCheckpoint);
assertThat(readerFromOriginal.subscription, equalTo(createdSubscription));
assertThat(readerFromDeser.subscription, equalTo(createdSubscription));
}
/** Tests that checkpoints finalized after the reader is closed succeed. */
@Test
public void closeWithActiveCheckpoints() throws Exception {
setupOneMessage();
PubsubReader reader = primSource.createReader(p.getOptions(), null);
reader.start();
PubsubCheckpoint checkpoint = reader.getCheckpointMark();
reader.close();
checkpoint.finalizeCheckpoint();
}
}