| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.connectors.gcp.pubsub.emulator; |
| |
| import com.google.api.gax.rpc.NotFoundException; |
| import com.google.api.gax.rpc.TransportChannelProvider; |
| import com.google.cloud.pubsub.v1.MessageReceiver; |
| import com.google.cloud.pubsub.v1.Publisher; |
| import com.google.cloud.pubsub.v1.Subscriber; |
| import com.google.cloud.pubsub.v1.SubscriptionAdminClient; |
| import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; |
| import com.google.cloud.pubsub.v1.TopicAdminClient; |
| import com.google.cloud.pubsub.v1.TopicAdminSettings; |
| import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; |
| import com.google.cloud.pubsub.v1.stub.SubscriberStub; |
| import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; |
| import com.google.pubsub.v1.AcknowledgeRequest; |
| import com.google.pubsub.v1.ProjectSubscriptionName; |
| import com.google.pubsub.v1.PullRequest; |
| import com.google.pubsub.v1.PushConfig; |
| import com.google.pubsub.v1.ReceivedMessage; |
| import com.google.pubsub.v1.Topic; |
| import com.google.pubsub.v1.TopicName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.List; |
| import java.util.stream.Collectors; |
| |
| /** A helper class to make managing the testing topics a bit easier. */ |
| public class PubsubHelper { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(PubsubHelper.class); |
| |
| private TransportChannelProvider channelProvider; |
| |
| private TopicAdminClient topicClient; |
| private SubscriptionAdminClient subscriptionAdminClient; |
| |
| public PubsubHelper(TransportChannelProvider channelProvider) { |
| this.channelProvider = channelProvider; |
| } |
| |
| public TopicAdminClient getTopicAdminClient() throws IOException { |
| if (topicClient == null) { |
| TopicAdminSettings topicAdminSettings = |
| TopicAdminSettings.newBuilder() |
| .setTransportChannelProvider(channelProvider) |
| .setCredentialsProvider(EmulatorCredentialsProvider.create()) |
| .build(); |
| topicClient = TopicAdminClient.create(topicAdminSettings); |
| } |
| return topicClient; |
| } |
| |
| public Topic createTopic(String project, String topic) throws IOException { |
| deleteTopic(project, topic); |
| TopicName topicName = TopicName.of(project, topic); |
| TopicAdminClient adminClient = getTopicAdminClient(); |
| LOG.info("CreateTopic {}", topicName); |
| return adminClient.createTopic(topicName); |
| } |
| |
| public void deleteTopic(String project, String topic) throws IOException { |
| deleteTopic(TopicName.of(project, topic)); |
| } |
| |
| public void deleteTopic(TopicName topicName) throws IOException { |
| TopicAdminClient adminClient = getTopicAdminClient(); |
| try { |
| adminClient.getTopic(topicName); |
| } catch (NotFoundException e) { |
| // Doesn't exist. Good. |
| return; |
| } |
| |
| // If it exists we delete all subscriptions and the topic itself. |
| LOG.info("DeleteTopic {} first delete old subscriptions.", topicName); |
| adminClient |
| .listTopicSubscriptions(topicName) |
| .iterateAllAsProjectSubscriptionName() |
| .forEach(subscriptionAdminClient::deleteSubscription); |
| LOG.info("DeleteTopic {}", topicName); |
| adminClient.deleteTopic(topicName); |
| } |
| |
| public SubscriptionAdminClient getSubscriptionAdminClient() throws IOException { |
| if (subscriptionAdminClient == null) { |
| SubscriptionAdminSettings subscriptionAdminSettings = |
| SubscriptionAdminSettings.newBuilder() |
| .setTransportChannelProvider(channelProvider) |
| .setCredentialsProvider(EmulatorCredentialsProvider.create()) |
| .build(); |
| subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings); |
| } |
| return subscriptionAdminClient; |
| } |
| |
| public void createSubscription( |
| String subscriptionProject, String subscription, String topicProject, String topic) |
| throws IOException { |
| ProjectSubscriptionName subscriptionName = |
| ProjectSubscriptionName.newBuilder() |
| .setProject(subscriptionProject) |
| .setSubscription(subscription) |
| .build(); |
| |
| deleteSubscription(subscriptionName); |
| |
| TopicName topicName = TopicName.of(topicProject, topic); |
| |
| PushConfig pushConfig = PushConfig.getDefaultInstance(); |
| |
| LOG.info("CreateSubscription {}", subscriptionName); |
| getSubscriptionAdminClient() |
| .createSubscription(subscriptionName, topicName, pushConfig, 1) |
| .isInitialized(); |
| } |
| |
| public void deleteSubscription(String subscriptionProject, String subscription) |
| throws IOException { |
| deleteSubscription( |
| ProjectSubscriptionName.newBuilder() |
| .setProject(subscriptionProject) |
| .setSubscription(subscription) |
| .build()); |
| } |
| |
| public void deleteSubscription(ProjectSubscriptionName subscriptionName) throws IOException { |
| SubscriptionAdminClient adminClient = getSubscriptionAdminClient(); |
| try { |
| adminClient.getSubscription(subscriptionName); |
| // If it already exists we must first delete it. |
| LOG.info("DeleteSubscription {}", subscriptionName); |
| adminClient.deleteSubscription(subscriptionName); |
| } catch (NotFoundException e) { |
| // Doesn't exist. Good. |
| } |
| } |
| |
| // |
| // Mostly copied from the example on https://cloud.google.com/pubsub/docs/pull |
| // Licensed under the Apache 2.0 License to "Google LLC" from |
| // https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java. |
| // |
| public List<ReceivedMessage> pullMessages( |
| String projectId, String subscriptionId, int maxNumberOfMessages) throws Exception { |
| SubscriberStubSettings subscriberStubSettings = |
| SubscriberStubSettings.newBuilder() |
| .setTransportChannelProvider(channelProvider) |
| .setCredentialsProvider(EmulatorCredentialsProvider.create()) |
| .build(); |
| try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) { |
| String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId); |
| PullRequest pullRequest = |
| PullRequest.newBuilder() |
| .setMaxMessages(maxNumberOfMessages) |
| .setSubscription(subscriptionName) |
| .build(); |
| |
| List<ReceivedMessage> receivedMessages = |
| subscriber.pullCallable().call(pullRequest).getReceivedMessagesList(); |
| acknowledgeIds(subscriber, subscriptionName, receivedMessages); |
| return receivedMessages; |
| } |
| } |
| |
| private void acknowledgeIds( |
| SubscriberStub subscriber, |
| String subscriptionName, |
| List<ReceivedMessage> receivedMessages) { |
| if (receivedMessages.isEmpty()) { |
| return; |
| } |
| |
| List<String> ackIds = |
| receivedMessages.stream() |
| .map(ReceivedMessage::getAckId) |
| .collect(Collectors.toList()); |
| // acknowledge received messages |
| AcknowledgeRequest acknowledgeRequest = |
| AcknowledgeRequest.newBuilder() |
| .setSubscription(subscriptionName) |
| .addAllAckIds(ackIds) |
| .build(); |
| // use acknowledgeCallable().futureCall to asynchronously perform this operation |
| subscriber.acknowledgeCallable().call(acknowledgeRequest); |
| } |
| |
| public Subscriber subscribeToSubscription( |
| String project, String subscription, MessageReceiver messageReceiver) { |
| ProjectSubscriptionName subscriptionName = |
| ProjectSubscriptionName.of(project, subscription); |
| Subscriber subscriber = |
| Subscriber.newBuilder(subscriptionName, messageReceiver) |
| .setChannelProvider(channelProvider) |
| .setCredentialsProvider(EmulatorCredentialsProvider.create()) |
| .build(); |
| subscriber.startAsync(); |
| return subscriber; |
| } |
| |
| public Publisher createPublisher(String project, String topic) throws IOException { |
| return Publisher.newBuilder(TopicName.of(project, topic)) |
| .setChannelProvider(channelProvider) |
| .setCredentialsProvider(EmulatorCredentialsProvider.create()) |
| .build(); |
| } |
| } |