blob: 1258d0b9743d2c494e6b78a3f1b78bb1d2e41366 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.joda.time.Duration;
/**
* A PTransform which streams messages to Pubsub.
*
* <ul>
* <li>The underlying implementation is just a {@link GroupByKey} followed by a {@link ParDo}
* which publishes as a side effect. (In the future we want to design and switch to a custom
* {@code UnboundedSink} implementation so as to gain access to system watermark and
* end-of-pipeline cleanup.)
* <li>We try to send messages in batches while also limiting send latency.
* <li>No stats are logged. Rather some counters are used to keep track of elements and batches.
* <li>Though some background threads are used by the underlying netty system all actual Pubsub
* calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn}
* instances to execute concurrently and hide latency.
* <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer to
* dedup messages.
* </ul>
*/
public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, PDone> {
/** Default maximum number of messages per publish. */
static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
/** Default maximum size of a publish batch, in bytes. */
static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
/** Default longest delay between receiving a message and pushing it to Pubsub. */
private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2);
/** Coder for conveying outgoing messages between internal stages. */
private static class OutgoingMessageCoder extends AtomicCoder<OutgoingMessage> {
private static final NullableCoder<String> RECORD_ID_CODER =
NullableCoder.of(StringUtf8Coder.of());
private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER =
NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
@Override
public void encode(OutgoingMessage value, OutputStream outStream)
throws CoderException, IOException {
ByteArrayCoder.of().encode(value.elementBytes, outStream);
ATTRIBUTES_CODER.encode(value.attributes, outStream);
BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream);
RECORD_ID_CODER.encode(value.recordId, outStream);
}
@Override
public OutgoingMessage decode(InputStream inStream) throws CoderException, IOException {
byte[] elementBytes = ByteArrayCoder.of().decode(inStream);
Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream);
long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream);
@Nullable String recordId = RECORD_ID_CODER.decode(inStream);
return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId);
}
}
@VisibleForTesting static final Coder<OutgoingMessage> CODER = new OutgoingMessageCoder();
// ================================================================================
// RecordIdMethod
// ================================================================================
/** Specify how record ids are to be generated. */
@VisibleForTesting
enum RecordIdMethod {
/** Leave null. */
NONE,
/** Generate randomly. */
RANDOM,
/** Generate deterministically. For testing only. */
DETERMINISTIC
}
// ================================================================================
// ShardFn
// ================================================================================
/** Convert elements to messages and shard them. */
private static class ShardFn extends DoFn<PubsubMessage, KV<Integer, OutgoingMessage>> {
private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements");
private final int numShards;
private final RecordIdMethod recordIdMethod;
ShardFn(int numShards, RecordIdMethod recordIdMethod) {
this.numShards = numShards;
this.recordIdMethod = recordIdMethod;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
elementCounter.inc();
PubsubMessage message = c.element();
byte[] elementBytes = message.getPayload();
Map<String, String> attributes = message.getAttributeMap();
long timestampMsSinceEpoch = c.timestamp().getMillis();
@Nullable String recordId = null;
switch (recordIdMethod) {
case NONE:
break;
case DETERMINISTIC:
recordId = Hashing.murmur3_128().hashBytes(elementBytes).toString();
break;
case RANDOM:
// Since these elements go through a GroupByKey, any failures while sending to
// Pubsub will be retried without falling back and generating a new record id.
// Thus even though we may send the same message to Pubsub twice, it is guaranteed
// to have the same record id.
recordId = UUID.randomUUID().toString();
break;
}
c.output(
KV.of(
ThreadLocalRandom.current().nextInt(numShards),
new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId)));
}
@Override
public void populateDisplayData(Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("numShards", numShards));
}
}
// ================================================================================
// WriterFn
// ================================================================================
/** Publish messages to Pubsub in batches. */
private static class WriterFn extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
private final PubsubClientFactory pubsubFactory;
private final ValueProvider<TopicPath> topic;
private final String timestampAttribute;
private final String idAttribute;
private final int publishBatchSize;
private final int publishBatchBytes;
/** Client on which to talk to Pubsub. Null until created by {@link #startBundle}. */
@Nullable private transient PubsubClient pubsubClient;
private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches");
private final Counter elementCounter = SinkMetrics.elementsWritten();
private final Counter byteCounter = SinkMetrics.bytesWritten();
WriterFn(
PubsubClientFactory pubsubFactory,
ValueProvider<TopicPath> topic,
String timestampAttribute,
String idAttribute,
int publishBatchSize,
int publishBatchBytes) {
this.pubsubFactory = pubsubFactory;
this.topic = topic;
this.timestampAttribute = timestampAttribute;
this.idAttribute = idAttribute;
this.publishBatchSize = publishBatchSize;
this.publishBatchBytes = publishBatchBytes;
}
/** BLOCKING Send {@code messages} as a batch to Pubsub. */
private void publishBatch(List<OutgoingMessage> messages, int bytes) throws IOException {
int n = pubsubClient.publish(topic.get(), messages);
checkState(
n == messages.size(),
"Attempted to publish %s messages but %s were successful",
messages.size(),
n);
batchCounter.inc();
elementCounter.inc(messages.size());
byteCounter.inc(bytes);
}
@StartBundle
public void startBundle(StartBundleContext c) throws Exception {
checkState(pubsubClient == null, "startBundle invoked without prior finishBundle");
pubsubClient =
pubsubFactory.newClient(
timestampAttribute, idAttribute, c.getPipelineOptions().as(PubsubOptions.class));
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
int bytes = 0;
for (OutgoingMessage message : c.element().getValue()) {
if (!pubsubMessages.isEmpty() && bytes + message.elementBytes.length > publishBatchBytes) {
// Break large (in bytes) batches into smaller.
// (We've already broken by batch size using the trigger below, though that may
// run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since
// the hard limit from Pubsub is by bytes rather than number of messages.)
// BLOCKS until published.
publishBatch(pubsubMessages, bytes);
pubsubMessages.clear();
bytes = 0;
}
pubsubMessages.add(message);
bytes += message.elementBytes.length;
}
if (!pubsubMessages.isEmpty()) {
// BLOCKS until published.
publishBatch(pubsubMessages, bytes);
}
}
@FinishBundle
public void finishBundle() throws Exception {
pubsubClient.close();
pubsubClient = null;
}
@Override
public void populateDisplayData(Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("topic", topic));
builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
builder.addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute));
builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute));
}
}
// ================================================================================
// PubsubUnboundedSink
// ================================================================================
/** Which factory to use for creating Pubsub transport. */
private final PubsubClientFactory pubsubFactory;
/** Pubsub topic to publish to. */
private final ValueProvider<TopicPath> topic;
/**
* Pubsub metadata field holding timestamp of each element, or {@literal null} if should use
* Pubsub message publish timestamp instead.
*/
@Nullable private final String timestampAttribute;
/**
* Pubsub metadata field holding id for each element, or {@literal null} if need to generate a
* unique id ourselves.
*/
@Nullable private final String idAttribute;
/**
* Number of 'shards' to use so that latency in Pubsub publish can be hidden. Generally this
* should be a small multiple of the number of available cores. Too smoll a number results in too
* much time lost to blocking Pubsub calls. To large a number results in too many single-element
* batches being sent to Pubsub with high per-batch overhead.
*/
private final int numShards;
/** Maximum number of messages per publish. */
private final int publishBatchSize;
/** Maximum size of a publish batch, in bytes. */
private final int publishBatchBytes;
/** Longest delay between receiving a message and pushing it to Pubsub. */
private final Duration maxLatency;
/**
* How record ids should be generated for each record (if {@link #idAttribute} is non-{@literal
* null}).
*/
private final RecordIdMethod recordIdMethod;
@VisibleForTesting
PubsubUnboundedSink(
PubsubClientFactory pubsubFactory,
ValueProvider<TopicPath> topic,
String timestampAttribute,
String idAttribute,
int numShards,
int publishBatchSize,
int publishBatchBytes,
Duration maxLatency,
RecordIdMethod recordIdMethod) {
this.pubsubFactory = pubsubFactory;
this.topic = topic;
this.timestampAttribute = timestampAttribute;
this.idAttribute = idAttribute;
this.numShards = numShards;
this.publishBatchSize = publishBatchSize;
this.publishBatchBytes = publishBatchBytes;
this.maxLatency = maxLatency;
this.recordIdMethod = idAttribute == null ? RecordIdMethod.NONE : recordIdMethod;
}
public PubsubUnboundedSink(
PubsubClientFactory pubsubFactory,
ValueProvider<TopicPath> topic,
String timestampAttribute,
String idAttribute,
int numShards) {
this(
pubsubFactory,
topic,
timestampAttribute,
idAttribute,
numShards,
DEFAULT_PUBLISH_BATCH_SIZE,
DEFAULT_PUBLISH_BATCH_BYTES,
DEFAULT_MAX_LATENCY,
RecordIdMethod.RANDOM);
}
public PubsubUnboundedSink(
PubsubClientFactory pubsubFactory,
ValueProvider<TopicPath> topic,
String timestampAttribute,
String idAttribute,
int numShards,
int publishBatchSize,
int publishBatchBytes) {
this(
pubsubFactory,
topic,
timestampAttribute,
idAttribute,
numShards,
publishBatchSize,
publishBatchBytes,
DEFAULT_MAX_LATENCY,
RecordIdMethod.RANDOM);
}
/** Get the topic being written to. */
public TopicPath getTopic() {
return topic.get();
}
/** Get the {@link ValueProvider} for the topic being written to. */
public ValueProvider<TopicPath> getTopicProvider() {
return topic;
}
/** Get the timestamp attribute. */
@Nullable
public String getTimestampAttribute() {
return timestampAttribute;
}
/** Get the id attribute. */
@Nullable
public String getIdAttribute() {
return idAttribute;
}
@Override
public PDone expand(PCollection<PubsubMessage> input) {
input
.apply(
"PubsubUnboundedSink.Window",
Window.<PubsubMessage>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(publishBatchSize),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(maxLatency))))
.discardingFiredPanes())
.apply("PubsubUnboundedSink.Shard", ParDo.of(new ShardFn(numShards, recordIdMethod)))
.setCoder(KvCoder.of(VarIntCoder.of(), CODER))
.apply(GroupByKey.create())
.apply(
"PubsubUnboundedSink.Writer",
ParDo.of(
new WriterFn(
pubsubFactory,
topic,
timestampAttribute,
idAttribute,
publishBatchSize,
publishBatchBytes)));
return PDone.in(input.getPipeline());
}
}