blob: 70b9e26ec7534692f93f6583ff2fc6caf3221ad1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.common.collect.ImmutableList;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
@SeeAlso({PublishGCPubSub.class})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume"})
@CapabilityDescription("Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, " +
"the configured number of messages will be pulled in a single request, else only one message will be pulled.")
@WritesAttributes({
@WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = ACK_ID_DESCRIPTION),
@WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description = SERIALIZED_SIZE_DESCRIPTION),
@WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
@WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION),
@WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
})
public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
.name("gcp-pubsub-subscription")
.displayName("Subscription")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.description("Name of the Google Cloud Pub/Sub Subscription")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
private SubscriberStub subscriber = null;
private PullRequest pullRequest;
private AtomicReference<Exception> storedException = new AtomicReference<>();
@OnScheduled
public void onScheduled(ProcessContext context) {
final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
pullRequest = PullRequest.newBuilder()
.setMaxMessages(batchSize)
.setSubscription(getSubscriptionName(context))
.build();
try {
subscriber = getSubscriber(context);
} catch (IOException e) {
storedException.set(e);
getLogger().error("Failed to create Google Cloud Subscriber due to {}", new Object[]{e});
}
}
@OnStopped
public void onStopped() {
if (subscriber != null) {
subscriber.shutdown();
}
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.of(PROJECT_ID,
GCP_CREDENTIALS_PROVIDER_SERVICE,
SUBSCRIPTION,
BATCH_SIZE);
}
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (subscriber == null) {
if (storedException.get() != null) {
getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", storedException.get());
} else {
getLogger().error("Google Cloud PubSub Subscriber was not properly created. Yielding the processor...");
}
context.yield();
return;
}
final PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
final List<String> ackIds = new ArrayList<>();
final String subscriptionName = getSubscriptionName(context);
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
if (message.hasMessage()) {
FlowFile flowFile = session.create();
final Map<String, String> attributes = new HashMap<>();
ackIds.add(message.getAckId());
attributes.put(ACK_ID_ATTRIBUTE, message.getAckId());
attributes.put(SERIALIZED_SIZE_ATTRIBUTE, String.valueOf(message.getSerializedSize()));
attributes.put(MESSAGE_ID_ATTRIBUTE, message.getMessage().getMessageId());
attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, String.valueOf(message.getMessage().getAttributesCount()));
attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE, String.valueOf(message.getMessage().getPublishTime().getSeconds()));
attributes.putAll(message.getMessage().getAttributesMap());
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toByteArray()));
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, subscriptionName);
}
}
session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName));
}
private void acknowledgeAcks(final Collection<String> ackIds, final String subscriptionName) {
if (ackIds == null || ackIds.isEmpty()) {
return;
}
AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
.addAllAckIds(ackIds)
.setSubscription(subscriptionName)
.build();
subscriber.acknowledgeCallable().call(acknowledgeRequest);
}
private String getSubscriptionName(final ProcessContext context) {
final String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
if (subscriptionName.contains("/")) {
return ProjectSubscriptionName.parse(subscriptionName).toString();
} else {
return ProjectSubscriptionName.of(projectId, subscriptionName).toString();
}
}
private SubscriberStub getSubscriber(final ProcessContext context) throws IOException {
final SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.build();
return GrpcSubscriberStub.create(subscriberStubSettings);
}
}