blob: 2d6ef861e883ea617c826c049519c2646b05a422 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import javax.mail.Authenticator;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.PasswordAuthentication;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
import com.google.common.collect.Maps;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.common.util.BaseOperator;
/**
* This operator outputs data to an smtp server.
* <p></p>
* @displayName Smtp Output
* @category Output
* @tags stmp, output operator
*
* @since 0.3.2
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public class SmtpOutputOperator extends BaseOperator
{
public enum RecipientType
{
TO, CC, BCC
}
private static final Logger LOG = LoggerFactory.getLogger(SmtpOutputOperator.class);
@NotNull
private String subject;
@NotNull
private String content;
@NotNull
private String from;
private Map<String, String> recipients = Maps.newHashMap();
private int smtpPort = 587;
@NotNull
private String smtpHost;
private String smtpUserName;
private String smtpPassword;
private String contentType = "text/plain";
private boolean useSsl = false;
private boolean setupCalled = false;
protected transient Properties properties = System.getProperties();
protected transient Authenticator auth;
protected transient Session session;
protected transient Message message;
/**
* This is the port which receives the tuples that will be output to an smtp server.
*/
public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
{
@Override
public void process(Object t)
{
try {
String mailContent = content.replace("{}", t.toString());
message.setContent(mailContent, contentType);
LOG.info("Sending email for tuple {}", t.toString());
Transport.send(message);
} catch (MessagingException ex) {
LOG.error("Something wrong with sending email.", ex);
}
}
};
public String getSubject()
{
return subject;
}
public void setSubject(String subject)
{
this.subject = subject;
resetMessage();
}
public String getContent()
{
return content;
}
public void setContent(String content)
{
this.content = content;
resetMessage();
}
public String getFrom()
{
return from;
}
public void setFrom(String from)
{
this.from = from;
resetMessage();
}
public int getSmtpPort()
{
return smtpPort;
}
public void setSmtpPort(int smtpPort)
{
this.smtpPort = smtpPort;
reset();
}
public String getSmtpHost()
{
return smtpHost;
}
public void setSmtpHost(String smtpHost)
{
this.smtpHost = smtpHost;
reset();
}
public String getSmtpUserName()
{
return smtpUserName;
}
public void setSmtpUserName(String smtpUserName)
{
this.smtpUserName = smtpUserName;
reset();
}
public String getSmtpPassword()
{
return smtpPassword;
}
public void setSmtpPassword(String smtpPassword)
{
this.smtpPassword = smtpPassword;
reset();
}
public String getContentType()
{
return contentType;
}
public void setContentType(String contentType)
{
this.contentType = contentType;
resetMessage();
}
public boolean isUseSsl()
{
return useSsl;
}
public void setUseSsl(boolean useSsl)
{
this.useSsl = useSsl;
reset();
}
@Override
public void setup(OperatorContext context)
{
setupCalled = true;
reset();
}
private void reset()
{
if (!setupCalled) {
return;
}
if (!StringUtils.isBlank(smtpPassword)) {
properties.setProperty("mail.smtp.auth", "true");
properties.setProperty("mail.smtp.starttls.enable", "true");
if (useSsl) {
properties.setProperty("mail.smtp.socketFactory.port", String.valueOf(smtpPort));
properties.setProperty("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
properties.setProperty("mail.smtp.socketFactory.fallback", "false");
}
auth = new Authenticator()
{
@Override
protected PasswordAuthentication getPasswordAuthentication()
{
return new PasswordAuthentication(smtpUserName, smtpPassword);
}
};
}
properties.setProperty("mail.smtp.host", smtpHost);
properties.setProperty("mail.smtp.port", String.valueOf(smtpPort));
session = Session.getInstance(properties, auth);
resetMessage();
}
private void resetMessage()
{
if (!setupCalled) {
return;
}
try {
message = new MimeMessage(session);
message.setFrom(new InternetAddress(from));
for (Map.Entry<String, String> entry : recipients.entrySet()) {
RecipientType type = RecipientType.valueOf(entry.getKey().toUpperCase());
Message.RecipientType recipientType;
switch (type) {
case TO:
recipientType = Message.RecipientType.TO;
break;
case CC:
recipientType = Message.RecipientType.CC;
break;
case BCC:
default:
recipientType = Message.RecipientType.BCC;
break;
}
String[] addresses = entry.getValue().split(",");
for (String address : addresses) {
message.addRecipient(recipientType, new InternetAddress(address));
}
}
message.setSubject(subject);
LOG.debug("all recipients {}", Arrays.toString(message.getAllRecipients()));
} catch (MessagingException ex) {
throw new RuntimeException(ex);
}
}
public Map<String, String> getRecipients()
{
return recipients;
}
/**
* @param recipients : map from recipient type to coma separated list of addresses for e.g. to->abc@xyz.com,def@xyz.com
*/
public void setRecipients(Map<String, String> recipients)
{
this.recipients = recipients;
resetMessage();
}
@AssertTrue(message = "Please verify the recipients set")
private boolean isValid()
{
if (recipients.isEmpty()) {
return false;
}
for (Map.Entry<String, String> entry : recipients.entrySet()) {
if (entry.getKey().toUpperCase().equalsIgnoreCase(RecipientType.TO.toString())) {
if (entry.getValue() != null && entry.getValue().length() > 0) {
return true;
}
return false;
}
}
return false;
}
}