package org.apache.nifi.processors.standard;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import jakarta.activation.DataHandler;
import jakarta.mail.Authenticator;
import jakarta.mail.Message;
import jakarta.mail.Message.RecipientType;
import jakarta.mail.MessagingException;
import jakarta.mail.PasswordAuthentication;
import jakarta.mail.Session;
import jakarta.mail.Transport;
import jakarta.mail.internet.AddressException;
import jakarta.mail.internet.InternetAddress;
import jakarta.mail.internet.MimeBodyPart;
import jakarta.mail.internet.MimeMessage;
import jakarta.mail.internet.MimeMultipart;
import jakarta.mail.internet.MimeUtility;
import jakarta.mail.internet.PreencodedMimeBodyPart;
import jakarta.mail.util.ByteArrayDataSource;
import org.apache.commons.codec.binary.Base64;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@Tags({"email", "put", "notify", "smtp"})
@CapabilityDescription("Sends an e-mail to configured recipients for each incoming FlowFile")
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content (as a String object) "
+ "will be read into memory in case the property to use the flow file content as the email body is set to true.")
public class PutEmail extends AbstractProcessor {
public static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder()
.name("SMTP Hostname")
.description("The hostname of the SMTP host")
public static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder()
.name("SMTP Port")
.description("The Port used for SMTP communications")
public static final PropertyDescriptor SMTP_USERNAME = new PropertyDescriptor.Builder()
.name("SMTP Username")
.description("Username for the SMTP account")
public static final PropertyDescriptor SMTP_PASSWORD = new PropertyDescriptor.Builder()
.name("SMTP Password")
.description("Password for the SMTP account")
public static final PropertyDescriptor SMTP_AUTH = new PropertyDescriptor.Builder()
.name("SMTP Auth")
.description("Flag indicating whether authentication should be used")
public static final PropertyDescriptor SMTP_TLS = new PropertyDescriptor.Builder()
.name("SMTP TLS")
.displayName("SMTP STARTTLS")
.description("Flag indicating whether Opportunistic TLS should be enabled using STARTTLS command")
public static final PropertyDescriptor SMTP_SOCKET_FACTORY = new PropertyDescriptor.Builder()
.name("SMTP Socket Factory")
.description("Socket Factory to use for SMTP Connection")
public static final PropertyDescriptor HEADER_XMAILER = new PropertyDescriptor.Builder()
.name("SMTP X-Mailer Header")
.description("X-Mailer used in the header of the outgoing email")
public static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
.displayName("Attributes to Send as Headers (Regex)")
.description("A Regular Expression that is matched against all FlowFile attribute names. "
+ "Any attribute whose name matches the regex will be added to the Email messages as a Header. "
+ "If not specified, no FlowFile attributes will be added as headers.")
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content Type")
.description("Mime Type used to interpret the contents of the email, such as text/plain or text/html")
public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder()
.description("Specifies the Email address to use as the sender. "
+ "Comma separated sequence of addresses following RFC822 syntax.")
public static final PropertyDescriptor TO = new PropertyDescriptor.Builder()
.description("The recipients to include in the To-Line of the email. "
+ "Comma separated sequence of addresses following RFC822 syntax.")
public static final PropertyDescriptor CC = new PropertyDescriptor.Builder()
.description("The recipients to include in the CC-Line of the email. "
+ "Comma separated sequence of addresses following RFC822 syntax.")
public static final PropertyDescriptor BCC = new PropertyDescriptor.Builder()
.description("The recipients to include in the BCC-Line of the email. "
+ "Comma separated sequence of addresses following RFC822 syntax.")
public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder()
.description("The email subject")
.defaultValue("Message from NiFi")
public static final PropertyDescriptor MESSAGE = new PropertyDescriptor.Builder()
.description("The body of the email message")
public static final PropertyDescriptor ATTACH_FILE = new PropertyDescriptor.Builder()
.name("Attach File")
.description("Specifies whether or not the FlowFile content should be attached to the email")
.allowableValues("true", "false")
public static final PropertyDescriptor CONTENT_AS_MESSAGE = new PropertyDescriptor.Builder()
.displayName("Flow file content as message")
.description("Specifies whether or not the FlowFile content should be the message of the email. If true, the 'Message' property is ignored.")
public static final PropertyDescriptor INCLUDE_ALL_ATTRIBUTES = new PropertyDescriptor.Builder()
.name("Include All Attributes In Message")
.description("Specifies whether or not all FlowFile attributes should be recorded in the body of the email message")
.allowableValues("true", "false")
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("FlowFiles that are successfully sent will be routed to this relationship")
public static final Relationship REL_FAILURE = new Relationship.Builder()
.description("FlowFiles that fail to send will be routed to this relationship")
private static final Charset CONTENT_CHARSET = StandardCharsets.UTF_8;
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
* Mapping of the mail properties to the NiFi PropertyDescriptors that will be evaluated at runtime
private static final Map<String, PropertyDescriptor> propertyToContext = new HashMap<>();
static {
propertyToContext.put("", SMTP_HOSTNAME);
propertyToContext.put("mail.smtp.port", SMTP_PORT);
propertyToContext.put("mail.smtp.socketFactory.port", SMTP_PORT);
propertyToContext.put("mail.smtp.socketFactory.class", SMTP_SOCKET_FACTORY);
propertyToContext.put("mail.smtp.auth", SMTP_AUTH);
propertyToContext.put("mail.smtp.starttls.enable", SMTP_TLS);
propertyToContext.put("mail.smtp.user", SMTP_USERNAME);
propertyToContext.put("mail.smtp.password", SMTP_PASSWORD);
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(INCLUDE_ALL_ATTRIBUTES); = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
this.relationships = Collections.unmodifiableSet(relationships);
public Set<Relationship> getRelationships() {
return relationships;
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
final String to = context.getProperty(TO).getValue();
final String cc = context.getProperty(CC).getValue();
final String bcc = context.getProperty(BCC).getValue();
if (to == null && cc == null && bcc == null) {
errors.add(new ValidationResult.Builder().subject("To, CC, BCC").valid(false).explanation("Must specify at least one To/CC/BCC address").build());
return errors;
private volatile Pattern attributeNamePattern = null;
public void onScheduled(final ProcessContext context) {
final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
this.attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
private void setMessageHeader(final String header, final String value, final Message message) throws MessagingException {
final ComponentLog logger = getLogger();
try {
message.setHeader(header, MimeUtility.encodeText(value));
} catch (UnsupportedEncodingException e){
logger.warn("Unable to add header {} with value {} due to encoding exception", header, value);
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile flowFile = session.get();
if (flowFile == null) {
final Properties properties = this.getMailPropertiesFromFlowFile(context, flowFile);
final Session mailSession = this.createMailSession(properties);
final Message message = new MimeMessage(mailSession);
try {
message.addFrom(toInetAddresses(context, flowFile, FROM));
message.setRecipients(RecipientType.TO, toInetAddresses(context, flowFile, TO));
message.setRecipients(RecipientType.CC, toInetAddresses(context, flowFile, CC));
message.setRecipients(RecipientType.BCC, toInetAddresses(context, flowFile, BCC));
if (attributeNamePattern != null) {
for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
if (attributeNamePattern.matcher(entry.getKey()).matches()) {
this.setMessageHeader(entry.getKey(), entry.getValue(), message);
this.setMessageHeader("X-Mailer", context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions(flowFile).getValue(), message);
final String messageText = getMessage(flowFile, context, session);
final String contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue();
message.setContent(messageText, contentType);
message.setSentDate(new Date());
if (context.getProperty(ATTACH_FILE).asBoolean()) {
final MimeBodyPart mimeText = new PreencodedMimeBodyPart("base64");
mimeText.setDataHandler(new DataHandler(new ByteArrayDataSource(
Base64.encodeBase64(messageText.getBytes(CONTENT_CHARSET)), contentType + "; charset=\"utf-8\"")));
final MimeBodyPart mimeFile = new MimeBodyPart();, stream -> {
try {
mimeFile.setDataHandler(new DataHandler(new ByteArrayDataSource(stream, "application/octet-stream")));
} catch (final Exception e) {
throw new IOException(e);
final MimeMultipart multipart = new MimeMultipart();
session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString());
session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Sent email as a result of receiving {}", flowFile);
} catch (final ProcessException | MessagingException | IOException e) {
getLogger().error("Failed to send email for {}: {}; routing to failure", flowFile, e.getMessage(), e);
session.transfer(flowFile, REL_FAILURE);
private String getMessage(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) {
String messageText = "";
if(context.getProperty(CONTENT_AS_MESSAGE).evaluateAttributeExpressions(flowFile).asBoolean()) {
// reading all the content of the input flow file
final byte[] byteBuffer = new byte[(int) flowFile.getSize()];, in -> StreamUtils.fillBuffer(in, byteBuffer, false));
messageText = new String(byteBuffer, 0, byteBuffer.length, CONTENT_CHARSET);
} else if (context.getProperty(MESSAGE).isSet()) {
messageText = context.getProperty(MESSAGE).evaluateAttributeExpressions(flowFile).getValue();
if (context.getProperty(INCLUDE_ALL_ATTRIBUTES).asBoolean()) {
return formatAttributes(flowFile, messageText);
return messageText;
* Based on the input properties, determine whether an authenticate or unauthenticated session should be used. If authenticated, creates a Password Authenticator for use in sending the email.
* @param properties mail properties
* @return session
private Session createMailSession(final Properties properties) {
final boolean auth = Boolean.parseBoolean(properties.getProperty("mail.smtp.auth"));
* Conditionally create a password authenticator if the 'auth' parameter is set.
return auth ? Session.getInstance(properties, new Authenticator() {
public PasswordAuthentication getPasswordAuthentication() {
final String username = properties.getProperty("mail.smtp.user");
final String password = properties.getProperty("mail.smtp.password");
return new PasswordAuthentication(username, password);
}) : Session.getInstance(properties); // without auth
* Uses the mapping of javax.mail properties to NiFi PropertyDescriptors to build the required Properties object to be used for sending this email
* @param context context
* @param flowFile flowFile
* @return mail properties
private Properties getMailPropertiesFromFlowFile(final ProcessContext context, final FlowFile flowFile) {
final Properties properties = new Properties();
for (final Entry<String, PropertyDescriptor> entry : propertyToContext.entrySet()) {
// Evaluate the property descriptor against the flow file
final String flowFileValue = context.getProperty(entry.getValue()).evaluateAttributeExpressions(flowFile).getValue();
final String property = entry.getKey();
// Nullable values are not allowed, so filter out
if (null != flowFileValue) {
properties.setProperty(property, flowFileValue);
return properties;
public static final String BODY_SEPARATOR = "\n\n--------------------------------------------------\n";
private static String formatAttributes(final FlowFile flowFile, final String messagePrepend) {
final StringBuilder message = new StringBuilder(messagePrepend);
message.append("\nStandard FlowFile Metadata:");
message.append(String.format("\n\t%1$s = '%2$s'", "id", flowFile.getAttribute(CoreAttributes.UUID.key())));
message.append(String.format("\n\t%1$s = '%2$s'", "entryDate", new Date(flowFile.getEntryDate())));
message.append(String.format("\n\t%1$s = '%2$s'", "fileSize", flowFile.getSize()));
message.append("\nFlowFile Attributes:");
for (final Entry<String, String> attribute : flowFile.getAttributes().entrySet()) {
message.append(String.format("\n\t%1$s = '%2$s'", attribute.getKey(), attribute.getValue()));
return message.toString();
* @param context the current context
* @param flowFile the current flow file
* @param propertyDescriptor the property to evaluate
* @return an InternetAddress[] parsed from the supplied property
* @throws AddressException if the property cannot be parsed to a valid InternetAddress[]
private InternetAddress[] toInetAddresses(final ProcessContext context, final FlowFile flowFile,
PropertyDescriptor propertyDescriptor) throws AddressException {
InternetAddress[] parse;
final String value = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue();
if (value == null || value.isEmpty()){
if (propertyDescriptor.isRequired()) {
final String exceptionMsg = "Required property '" + propertyDescriptor.getDisplayName() + "' evaluates to an empty string.";
throw new AddressException(exceptionMsg);
} else {
parse = new InternetAddress[0];
} else {
try {
parse = InternetAddress.parse(value);
} catch (AddressException e) {
final String exceptionMsg = "Unable to parse a valid address for property '" + propertyDescriptor.getDisplayName() + "' with value '"+ value +"'";
throw new AddressException(exceptionMsg);
return parse;
* Wrapper for static method {@link Transport#send(Message)} to add testability of this class.
* @param msg the message to send
* @throws MessagingException on error
protected void send(final Message msg) throws MessagingException {