blob: 700627205652874e8a8b1948165c7852bf9917ba [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.record.sink;
import jakarta.mail.Authenticator;
import jakarta.mail.Message;
import jakarta.mail.MessagingException;
import jakarta.mail.PasswordAuthentication;
import jakarta.mail.Session;
import jakarta.mail.Transport;
import jakarta.mail.internet.AddressException;
import jakarta.mail.internet.InternetAddress;
import jakarta.mail.internet.MimeMessage;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@Tags({"email", "smtp", "record", "sink", "send", "write"})
@CapabilityDescription("Provides a RecordSinkService that can be used to send records in email using the specified writer for formatting.")
public class EmailRecordSink extends AbstractControllerService implements RecordSinkService {
private static final String RFC822 = "Comma separated sequence of addresses following RFC822 syntax.";
public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder()
.description("Specifies the Email address to use as the sender. " + RFC822)
public static final PropertyDescriptor TO = new PropertyDescriptor.Builder()
.description("The recipients to include in the To-Line of the email. " + RFC822)
public static final PropertyDescriptor CC = new PropertyDescriptor.Builder()
.description("The recipients to include in the CC-Line of the email. " + RFC822)
public static final PropertyDescriptor BCC = new PropertyDescriptor.Builder()
.description("The recipients to include in the BCC-Line of the email. " + RFC822)
public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder()
.description("The email subject")
.defaultValue("Message from NiFi")
public static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder()
.displayName("SMTP Hostname")
.description("The hostname of the SMTP Server that is used to send Email Notifications")
public static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder()
.displayName("SMTP Port")
.description("The Port used for SMTP communications")
public static final PropertyDescriptor SMTP_AUTH = new PropertyDescriptor.Builder()
.displayName("SMTP Auth")
.description("Flag indicating whether authentication should be used")
public static final PropertyDescriptor SMTP_USERNAME = new PropertyDescriptor.Builder()
.displayName("SMTP Username")
.description("Username for the SMTP account")
.dependsOn(SMTP_AUTH, "true")
public static final PropertyDescriptor SMTP_PASSWORD = new PropertyDescriptor.Builder()
.displayName("SMTP Password")
.description("Password for the SMTP account")
.dependsOn(SMTP_AUTH, "true")
public static final PropertyDescriptor SMTP_STARTTLS = new PropertyDescriptor.Builder()
.displayName("SMTP STARTTLS")
.description("Flag indicating whether STARTTLS should be enabled. "
+ "If the server does not support STARTTLS, the connection continues without the use of TLS")
public static final PropertyDescriptor SMTP_SSL = new PropertyDescriptor.Builder()
.displayName("SMTP SSL")
.description("Flag indicating whether SSL should be enabled")
public static final PropertyDescriptor HEADER_XMAILER = new PropertyDescriptor.Builder()
.displayName("SMTP X-Mailer Header")
.description("X-Mailer used in the header of the outgoing email")
private volatile RecordSetWriterFactory writerFactory;
* Mapping of the mail properties to the NiFi PropertyDescriptors that will be evaluated at runtime
private static final Map<String, PropertyDescriptor> propertyToContext = new HashMap<>();
static {
propertyToContext.put("", SMTP_HOSTNAME);
propertyToContext.put("mail.smtp.port", SMTP_PORT);
propertyToContext.put("mail.smtps.port", SMTP_PORT);
propertyToContext.put("mail.smtp.socketFactory.port", SMTP_PORT);
propertyToContext.put("mail.smtp.ssl.enable", SMTP_SSL);
propertyToContext.put("mail.smtp.auth", SMTP_AUTH);
propertyToContext.put("mail.smtp.starttls.enable", SMTP_STARTTLS);
propertyToContext.put("mail.smtp.user", SMTP_USERNAME);
propertyToContext.put("mail.smtp.password", SMTP_PASSWORD);
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(Arrays.asList(
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
final String to = context.getProperty(TO).getValue();
final String cc = context.getProperty(CC).getValue();
final String bcc = context.getProperty(BCC).getValue();
if (to == null && cc == null && bcc == null) {
errors.add(new ValidationResult.Builder().subject("To, CC, BCC").valid(false).explanation("Must specify at least one To/CC/BCC address").build());
return errors;
public void onEnabled(final ConfigurationContext context) {
writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
public WriteResult sendData(final RecordSet recordSet, final Map<String, String> attributes, final boolean sendZeroResults) throws IOException {
WriteResult writeResult;
try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), out, attributes)) {
Record r;
while ((r = != null) {
writeResult = writer.finishRecordSet();
sendMessage(getConfigurationContext(), out.toString());
} catch (SchemaNotFoundException e) {
final String errorMessage = String.format("RecordSetWriter could not be created because the schema was not found. The schema name for the RecordSet to write is %s",
throw new ProcessException(errorMessage, e);
return writeResult;
* Wrapper for static method {@link Transport#send(Message)} to add testability of this class.
* @param msg the message to send
* @throws MessagingException on error
protected void send(final Message msg) throws MessagingException {
private void sendMessage(final ConfigurationContext context, final String messageText) {
final Properties properties = getMailProperties(context);
final Session mailSession = createMailSession(properties);
final Message message = new MimeMessage(mailSession);
try {
final InternetAddress[] toAddresses = toInternetAddresses(context, TO);
message.setRecipients(Message.RecipientType.TO, toAddresses);
final InternetAddress[] ccAddresses = toInternetAddresses(context, CC);
message.setRecipients(Message.RecipientType.CC, ccAddresses);
final InternetAddress[] bccAddresses = toInternetAddresses(context, BCC);
message.setRecipients(Message.RecipientType.BCC, bccAddresses);
message.setHeader("X-Mailer", context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions().getValue());
message.setContent(messageText, "text/plain");
message.setSentDate(new Date());
} catch (final ProcessException | MessagingException e) {
final String errorMessage = String.format("Send Failed using SMTP Host [%s] Port [%s]", properties.get(""), properties.get("mail.smtp.port"));
throw new RuntimeException(errorMessage, e);
* Uses the mapping of jakarta.mail properties to NiFi PropertyDescriptors to build the required Properties object to be used for sending this email
* @param context context
* @return mail properties
private Properties getMailProperties(final ConfigurationContext context) {
final Properties properties = new Properties();
for (Map.Entry<String, PropertyDescriptor> entry : propertyToContext.entrySet()) {
// Evaluate the property descriptor against the variable registry
String property = entry.getKey();
String propValue = context.getProperty(entry.getValue()).evaluateAttributeExpressions().getValue();
// Nullable values are not allowed, so filter out
if (StringUtils.isNotBlank(propValue)) {
properties.setProperty(property, propValue);
return properties;
* Based on the input properties, determine whether an authenticate or unauthenticated session should be used. If authenticated, creates a Password Authenticator for use in sending the email.
* @param properties mail properties
* @return session
private Session createMailSession(final Properties properties) {
final boolean auth = Boolean.parseBoolean(properties.getProperty("mail.smtp.auth"));
return auth ? Session.getInstance(properties, new Authenticator() {
public PasswordAuthentication getPasswordAuthentication() {
final String username = properties.getProperty("mail.smtp.user");
final String password = properties.getProperty("mail.smtp.password");
return new PasswordAuthentication(username, password);
}) : Session.getInstance(properties); // without auth
* @param context the current context
* @param propertyDescriptor the property to evaluate
* @return an InternetAddress[] parsed from the supplied property
* @throws AddressException if the property cannot be parsed to a valid InternetAddress[]
private InternetAddress[] toInternetAddresses(final ConfigurationContext context, PropertyDescriptor propertyDescriptor) throws AddressException {
InternetAddress[] parse;
final String value = context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue();
if (value == null || value.isEmpty()) {
if (propertyDescriptor.isRequired()) {
final String exceptionMsg = "Required property '" + propertyDescriptor.getDisplayName() + "' evaluates to an empty string.";
throw new AddressException(exceptionMsg);
} else {
parse = new InternetAddress[0];
} else {
try {
parse = InternetAddress.parse(value);
} catch (AddressException e) {
final String exceptionMsg = "Unable to parse a valid address for property '" + propertyDescriptor.getDisplayName() + "' with value '" + value + "'";
throw new AddressException(exceptionMsg);
return parse;