blob: 3f810070b277ae260963350679f4666c3863862f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.kinesis.firehose;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.kinesis.KinesisProcessorUtils;
import org.apache.nifi.processors.aws.v2.AbstractAwsSyncProcessor;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.firehose.FirehoseClient;
import software.amazon.awssdk.services.firehose.FirehoseClientBuilder;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
import software.amazon.awssdk.services.firehose.model.Record;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. "
+ "In order to send data to firehose, the firehose delivery stream name has to be specified.")
@WritesAttributes({
@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"),
@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"),
@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")})
public class PutKinesisFirehose extends AbstractAwsSyncProcessor<FirehoseClient, FirehoseClientBuilder> {
public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
.name("Amazon Kinesis Firehose Delivery Stream Name")
.description("The name of kinesis firehose delivery stream")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("Batch size for messages (1-500).")
.defaultValue("250")
.required(false)
.addValidator(StandardValidators.createLongValidator(1, 500, true))
.sensitive(false)
.build();
public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder()
.name("Max message buffer size")
.description("Max message buffer")
.defaultValue("1 MB")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.sensitive(false)
.build();
private static final List<PropertyDescriptor> properties = List.of(
KINESIS_FIREHOSE_DELIVERY_STREAM_NAME,
BATCH_SIZE,
REGION,
AWS_CREDENTIALS_PROVIDER_SERVICE,
MAX_MESSAGE_BUFFER_SIZE_MB,
TIMEOUT,
PROXY_CONFIGURATION_SERVICE,
ENDPOINT_OVERRIDE);
public static final int MAX_MESSAGE_SIZE = KinesisProcessorUtils.MAX_MESSAGE_SIZE;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected FirehoseClientBuilder createClientBuilder(final ProcessContext context) {
return FirehoseClient.builder();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
final List<FlowFile> flowFiles = KinesisProcessorUtils.filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, getLogger());
final Map<String, List<FlowFile>> hashFlowFiles = new HashMap<>();
final Map<String, List<Record>> recordHash = new HashMap<>();
final FirehoseClient client = getClient(context);
try {
final List<FlowFile> failedFlowFiles = new ArrayList<>();
final List<FlowFile> successfulFlowFiles = new ArrayList<>();
// Prepare batch of records
for (final FlowFile flowFile : flowFiles) {
final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
recordHash.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>());
session.read(flowFile, in -> recordHash.get(firehoseStreamName).add(Record.builder().data(SdkBytes.fromInputStream(in)).build()));
final List<FlowFile> flowFilesForStream = hashFlowFiles.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>());
flowFilesForStream.add(flowFile);
}
for (final Map.Entry<String, List<Record>> entry : recordHash.entrySet()) {
final String streamName = entry.getKey();
final List<Record> records = entry.getValue();
if (records.size() > 0) {
// Send the batch
final PutRecordBatchRequest putRecordBatchRequest = PutRecordBatchRequest.builder()
.deliveryStreamName(streamName)
.records(records)
.build();
final PutRecordBatchResponse response = client.putRecordBatch(putRecordBatchRequest);
// Separate out the successful and failed flow files
final List<PutRecordBatchResponseEntry> responseEntries = response.requestResponses();
for (int i = 0; i < responseEntries.size(); i++ ) {
final PutRecordBatchResponseEntry responseEntry = responseEntries.get(i);
FlowFile flowFile = hashFlowFiles.get(streamName).get(i);
final Map<String, String> attributes = new HashMap<>();
attributes.put(AWS_KINESIS_FIREHOSE_RECORD_ID, responseEntry.recordId());
flowFile = session.putAttribute(flowFile, AWS_KINESIS_FIREHOSE_RECORD_ID, responseEntry.recordId());
if (StringUtils.isNotBlank(responseEntry.errorCode())) {
attributes.put(AWS_KINESIS_FIREHOSE_ERROR_CODE, responseEntry.errorCode());
attributes.put(AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, responseEntry.errorMessage());
flowFile = session.putAllAttributes(flowFile, attributes);
failedFlowFiles.add(flowFile);
} else {
flowFile = session.putAllAttributes(flowFile, attributes);
successfulFlowFiles.add(flowFile);
}
}
recordHash.get(streamName).clear();
records.clear();
}
}
if (failedFlowFiles.size() > 0) {
session.transfer(failedFlowFiles, REL_FAILURE);
getLogger().error("Failed to publish to kinesis firehose {}", failedFlowFiles);
}
if (successfulFlowFiles.size() > 0) {
session.transfer(successfulFlowFiles, REL_SUCCESS);
getLogger().info("Successfully published to kinesis firehose {}", successfulFlowFiles);
}
} catch (final Exception exception) {
getLogger().error("Failed to publish to kinesis firehose {} with exception {}", flowFiles, exception);
session.transfer(flowFiles, REL_FAILURE);
context.yield();
}
}
}