blob: e798e87d7f497aaf02585361e2e3c2a314750ea1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package nativepubsubio contains a Golang implementation of streaming reads
// and writes to PubSub. This is not as fully featured as the cross-language
// pubsubio package present in the Beam Go repository and should not be used
// in place of it.
package nativepubsubio
import (
"context"
"errors"
"fmt"
"time"
"cloud.google.com/go/pubsub"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
)
func init() {
register.DoFn5x2[context.Context, beam.BundleFinalization, *sdf.LockRTracker, []byte, func(beam.EventTime, []byte), sdf.ProcessContinuation, error](&pubSubRead{})
register.DoFn2x1[context.Context, []byte, error](&pubSubWrite{})
register.Emitter2[beam.EventTime, []byte]()
}
// pubSubRead is a structural DoFn representing a read from a given subscription ID.
type pubSubRead struct {
ProjectID string
Subscription string
client *pubsub.Client
processedMessages []*pubsub.Message
}
// newPubSubRead inserts an unbounded read from a PubSub topic into the pipeline. If an existing subscription
// is provided, the DoFn will read using that subscription; otherwise, a new subscription to the topic
// will be created using the provided subscription name.
func newPubSubRead(ctx context.Context, projectID, topic, subscription string) (*pubSubRead, error) {
if topic == "" {
return nil, errors.New("please provide either a topic to read from")
}
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, err
}
top := client.Topic(topic)
if ok, err := top.Exists(ctx); !ok || err != nil {
return nil, fmt.Errorf("failed to get topic; exists: %v, error: %v", ok, err)
}
sub, err := pubsubx.EnsureSubscription(ctx, client, topic, subscription)
if err != nil {
return nil, err
}
return &pubSubRead{ProjectID: projectID, Subscription: sub.ID()}, nil
}
// CreateInitialRestriction() establishes the PubSub subscription ID as the
// initial restriction
func (r *pubSubRead) CreateInitialRestriction(_ []byte) string {
return r.Subscription
}
// CreateTracker wraps the PubSub subscription ID in a StaticRTracker
// and applies a mutex via LockRTracker.
func (r *pubSubRead) CreateTracker(rest string) *sdf.LockRTracker {
return sdf.NewLockRTracker(NewSubscriptionRTracker(rest))
}
// RestrictionSize always returns 1.0, as the restriction is always 1 subscription.
func (r *pubSubRead) RestrictionSize(_ []byte, rest string) float64 {
return 1.0
}
// SplitRestriction is a no-op as the restriction cannot be split.
func (r *pubSubRead) SplitRestriction(_ []byte, rest string) []string {
return []string{rest}
}
// Setup initializes a PubSub client if one has not been created already
func (r *pubSubRead) Setup(ctx context.Context) error {
if r.client == nil {
client, err := pubsub.NewClient(ctx, r.ProjectID)
if err != nil {
return err
}
r.client = client
}
return nil
}
var messageTimeout time.Duration = 5 * time.Second
// ProcessElement initializes a PubSub client if one has not been created already, reads from the PubSub subscription,
// and emits elements as it reads them. If no messages are available, the DoFn will schedule itself to resume processing
// later. If polling the subscription returns an error, the error will be logged and the DoFn will not reschedule itself.
func (r *pubSubRead) ProcessElement(ctx context.Context, bf beam.BundleFinalization, rt *sdf.LockRTracker, _ []byte, emit func(beam.EventTime, []byte)) (sdf.ProcessContinuation, error) {
// Register finalization callback
bf.RegisterCallback(5*time.Minute, func() error {
for _, m := range r.processedMessages {
m.Ack()
}
r.processedMessages = nil
return nil
})
for {
ok := rt.TryClaim(r.Subscription)
if !ok {
return sdf.ResumeProcessingIn(5 * time.Second), nil
}
sub := r.client.Subscription(r.Subscription)
canCtx, cFn := context.WithCancel(ctx)
// Because emitters are not thread safe and synchronous Receive() behavior
// is deprecated, we have to collect messages in a goroutine and pipe them
// out through a channel.
messChan := make(chan *pubsub.Message, 1)
go func(sendch chan<- *pubsub.Message) {
err := sub.Receive(canCtx, func(ctx context.Context, m *pubsub.Message) {
messChan <- m
})
if (err != nil) && (err != context.Canceled) {
log.Errorf(ctx, "error reading from PubSub: %v, stopping processing", err)
cFn()
close(messChan)
}
}(messChan)
timeout := time.NewTimer(messageTimeout)
for {
select {
case m, ok := <-messChan:
if !ok {
log.Debug(ctx, "stopping bundle processing")
return sdf.StopProcessing(), nil
}
r.processedMessages = append(r.processedMessages, m)
emit(beam.EventTime(m.PublishTime.UnixMilli()), m.Data)
if !timeout.Stop() {
<-timeout.C
}
timeout.Reset(messageTimeout)
case <-timeout.C:
log.Debugf(ctx, "cancelling receive context, scheduling resumption")
cFn()
return sdf.ResumeProcessingIn(10 * time.Second), nil
}
}
}
}
// Read reads messages from a PubSub topic in a streaming context, outputting
// received messages as a PCollection of byte slices. If the provided subscription
// name exists for the given topic, the DoFn will read from that subscription; otherwise,
// a new subscription with the given subscription name will be created and read from.
//
// This is an example and subject to change, including its behavior and function signature.
// Please use the cross-language implementation Read() instead.
func Read(ctx context.Context, s beam.Scope, project, topic, subscription string) beam.PCollection {
s = s.Scope("pubsubio.NativeRead")
psRead, err := newPubSubRead(ctx, project, topic, subscription)
if err != nil {
panic(err)
}
return beam.ParDo(s, psRead, beam.Impulse(s))
}
// pubSubWrite is a structural DoFn representing writes to a given PubSub topic.
type pubSubWrite struct {
ProjectID string
Topic string
client *pubsub.Client
}
// Setup initializes a PubSub client if one has not been created already
func (r *pubSubWrite) Setup(ctx context.Context) error {
if r.client == nil {
client, err := pubsub.NewClient(ctx, r.ProjectID)
if err != nil {
return err
}
r.client = client
}
return nil
}
// ProcessElement takes a []byte element and publishes it to the provided PubSub
// topic.
func (w *pubSubWrite) ProcessElement(ctx context.Context, elm []byte) error {
top := w.client.Topic(w.Topic)
psMess := &pubsub.Message{Data: elm}
result := top.Publish(ctx, psMess)
if _, err := result.Get(ctx); err != nil {
return err
}
return nil
}
// newPubSubWrite inserts a write to a PubSub topic into the pipeline.
func newPubSubWrite(ctx context.Context, projectID, topic string) (*pubSubWrite, error) {
if topic == "" {
return nil, errors.New("please provide a topic to write to")
}
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, err
}
top := client.Topic(topic)
if ok, err := top.Exists(ctx); !ok || err != nil {
return nil, fmt.Errorf("failed to get topic; exists: %v, error: %v", ok, err)
}
return &pubSubWrite{ProjectID: projectID, Topic: top.ID()}, nil
}
// Write publishes elements from a PCollection of byte slices to a PubSub topic.
// If the topic does not exist at pipeline construction time, the function will panic.
//
// This is an example and subject to change, including its behavior and function signature.
// Please use the cross-language implementation Write() instead.
func Write(ctx context.Context, s beam.Scope, col beam.PCollection, project, topic string) {
s = s.Scope("pubsubio.NativeWrite")
psWrite, err := newPubSubWrite(ctx, project, topic)
if err != nil {
panic(err)
}
beam.ParDo0(s, psWrite, col)
}