blob: d3928b8a2beaf36d8ef26fdd0ffb1b59265df498 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.scaladsl
import java.time.Instant
import java.util.Base64
import org.apache.pekko
import pekko.actor.{ ActorSystem, Cancellable }
import pekko.stream.RestartSettings
import pekko.stream.connectors.googlecloud.pubsub._
import pekko.stream.connectors.googlecloud.pubsub.scaladsl.GooglePubSub
import pekko.stream.scaladsl.{ Flow, FlowWithContext, RestartFlow, Sink, Source }
import pekko.{ Done, NotUsed }
import scala.collection.immutable.Seq
import scala.concurrent.duration._
import scala.concurrent.{ Future, Promise }
class ExampleUsage {
// #init-system
implicit val system: ActorSystem = ActorSystem()
val config = PubSubConfig()
val topic = "topic1"
val subscription = "subscription1"
// #init-system
// #publish-single
val publishMessage =
PublishMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes)))
val publishRequest = PublishRequest(Seq(publishMessage))
val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest)
val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
GooglePubSub.publish(topic, config)
val publishedMessageIds: Future[Seq[Seq[String]]] = source.via(publishFlow).runWith(Sink.seq)
// #publish-single
// #publish-single-with-context
val publishMessageWithContext =
PublishMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes)))
val publishRequestWithContext = PublishRequest(Seq(publishMessage))
val resultPromise = Promise[Seq[String]]()
val sourceWithContext: Source[(PublishRequest, Promise[Seq[String]]), NotUsed] =
Source.single(publishRequest -> resultPromise)
val publishFlowWithContext
: FlowWithContext[PublishRequest, Promise[Seq[String]], Seq[String], Promise[Seq[String]], NotUsed] =
GooglePubSub.publishWithContext[Promise[Seq[String]]](topic, config)
val publishedMessageIdsWithContext: Future[Seq[(Seq[String], Promise[Seq[String]])]] =
sourceWithContext.via(publishFlowWithContext).runWith(Sink.seq)
// #publish-single-with-context
// #publish-fast
val messageSource: Source[PublishMessage, NotUsed] = Source(List(publishMessage, publishMessage))
messageSource
.groupedWithin(1000, 1.minute)
.map(grouped => PublishRequest(grouped))
.via(publishFlow)
.to(Sink.seq)
// #publish-fast
// #publish with ordering key
val publishToRegionalEndpointFlow: Flow[PublishRequest, Seq[String], NotUsed] =
GooglePubSub.publish(topic, config, "europe-west1-pubsub.googleapis.com")
val messageWithOrderingKey: PublishMessage =
PublishMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes)), None, Some("my-ordering-key"))
val publishedMessageWithOrderingKeyIds: Future[Seq[Seq[String]]] = Source
.single(PublishRequest(Seq(messageWithOrderingKey)))
.via(publishToRegionalEndpointFlow)
.runWith(Sink.seq)
// #publish with ordering key
// #subscribe
val subscriptionSource: Source[ReceivedMessage, Cancellable] =
GooglePubSub.subscribe(subscription, config)
val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
GooglePubSub.acknowledge(subscription, config)
subscriptionSource
.map { message =>
// do something fun
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.to(ackSink)
// #subscribe
// #subscribe-source-control
Source
.tick(0.seconds, 10.seconds, Done)
.via(
RestartFlow.withBackoff(RestartSettings(1.second, 30.seconds, randomFactor = 0.2))(() =>
GooglePubSub.subscribeFlow(subscription, config)))
.map { message =>
// do something fun
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.to(ackSink)
// #subscribe-source-control
// #subscribe-auto-ack
val subscribeMessageSoruce: Source[ReceivedMessage, NotUsed] = // ???
// #subscribe-auto-ack
Source.single(ReceivedMessage("id", PubSubMessage(Some("data"), None, "msg-id-1", Instant.now)))
// #subscribe-auto-ack
val processMessage: Sink[ReceivedMessage, NotUsed] = // ???
// #subscribe-auto-ack
Flow[ReceivedMessage].to(Sink.ignore)
// #subscribe-auto-ack
val batchAckSink =
Flow[ReceivedMessage].map(_.ackId).groupedWithin(1000, 1.minute).map(AcknowledgeRequest.apply).to(ackSink)
val q = subscribeMessageSoruce.alsoTo(batchAckSink).to(processMessage)
// #subscribe-auto-ack
}