blob: 1d260b4d0d684095fd18725bdf5fcbd46f4d5fab [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.action.email;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import javax.activation.DataHandler;
import javax.activation.DataSource;
import javax.mail.Authenticator;
import javax.mail.Message;
import javax.mail.Message.RecipientType;
import javax.mail.MessagingException;
import javax.mail.Multipart;
import javax.mail.NoSuchProviderException;
import javax.mail.PasswordAuthentication;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.AddressException;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.action.ActionExecutorException.ErrorType;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.jdom.Namespace;
/**
* Email action executor. It takes to, cc addresses along with a subject and body and sends
* out an email.
*/
public class EmailActionExecutor extends ActionExecutor {
public static final String CONF_PREFIX = "oozie.email.";
public static final String EMAIL_SMTP_HOST = CONF_PREFIX + "smtp.host";
public static final String EMAIL_SMTP_PORT = CONF_PREFIX + "smtp.port";
public static final String EMAIL_SMTP_AUTH = CONF_PREFIX + "smtp.auth";
public static final String EMAIL_SMTP_USER = CONF_PREFIX + "smtp.username";
public static final String EMAIL_SMTP_PASS = CONF_PREFIX + "smtp.password";
public static final String EMAIL_SMTP_FROM = CONF_PREFIX + "from.address";
public static final String EMAIL_ATTACHMENT_ENABLED = CONF_PREFIX + "attachment.enabled";
private final static String TO = "to";
private final static String CC = "cc";
private final static String SUB = "subject";
private final static String BOD = "body";
private final static String ATTACHMENT = "attachment";
private final static String COMMA = ",";
private final static String CONTENT_TYPE = "content_type";
private final static String DEFAULT_CONTENT_TYPE = "text/plain";
private XLog LOG = XLog.getLog(getClass());
public static final String EMAIL_ATTACHMENT_ERROR_MSG =
"\n Note: This email is missing configured email attachments "
+ "as sending attachments in email action is disabled in the Oozie server. "
+ "It could be for security compliance with data protection or other reasons";
public EmailActionExecutor() {
super("email");
}
@Override
public void initActionType() {
super.initActionType();
}
@Override
public void start(Context context, WorkflowAction action) throws ActionExecutorException {
try {
context.setStartData("-", "-", "-");
Element actionXml = XmlUtils.parseXml(action.getConf());
validateAndMail(context, actionXml);
context.setExecutionData("OK", null);
}
catch (Exception ex) {
throw convertException(ex);
}
}
@SuppressWarnings("unchecked")
protected void validateAndMail(Context context, Element element) throws ActionExecutorException {
// The XSD does the min/max occurrence validation for us.
Namespace ns = element.getNamespace();
String tos[] = new String[0];
String ccs[] = new String[0];
String subject = "";
String body = "";
String attachments[] = new String[0];
String contentType;
Element child = null;
// <to> - One ought to exist.
String text = element.getChildTextTrim(TO, ns);
if (text.isEmpty()) {
throw new ActionExecutorException(ErrorType.ERROR, "EM001", "No receipents were specified in the to-address field.");
}
tos = text.split(COMMA);
// <cc> - Optional, but only one ought to exist.
try {
ccs = element.getChildTextTrim(CC, ns).split(COMMA);
} catch (Exception e) {
// It is alright for cc to be given empty or not be present.
ccs = new String[0];
}
// <subject> - One ought to exist.
subject = element.getChildTextTrim(SUB, ns);
// <body> - One ought to exist.
body = element.getChildTextTrim(BOD, ns);
// <attachment> - Optional
String attachment = element.getChildTextTrim(ATTACHMENT, ns);
if(attachment != null) {
attachments = attachment.split(COMMA);
}
contentType = element.getChildTextTrim(CONTENT_TYPE, ns);
if (contentType == null || contentType.isEmpty()) {
contentType = DEFAULT_CONTENT_TYPE;
}
// All good - lets try to mail!
email(tos, ccs, subject, body, attachments, contentType, context.getWorkflow().getUser());
}
public void email(String[] to, String[] cc, String subject, String body, String[] attachments, String contentType,
String user) throws ActionExecutorException {
// Get mailing server details.
String smtpHost = getOozieConf().get(EMAIL_SMTP_HOST, "localhost");
String smtpPort = getOozieConf().get(EMAIL_SMTP_PORT, "25");
Boolean smtpAuth = getOozieConf().getBoolean(EMAIL_SMTP_AUTH, false);
String smtpUser = getOozieConf().get(EMAIL_SMTP_USER, "");
String smtpPassword = getOozieConf().get(EMAIL_SMTP_PASS, "");
String fromAddr = getOozieConf().get(EMAIL_SMTP_FROM, "oozie@localhost");
Properties properties = new Properties();
properties.setProperty("mail.smtp.host", smtpHost);
properties.setProperty("mail.smtp.port", smtpPort);
properties.setProperty("mail.smtp.auth", smtpAuth.toString());
Session session;
// Do not use default instance (i.e. Session.getDefaultInstance)
// (cause it may lead to issues when used second time).
if (!smtpAuth) {
session = Session.getInstance(properties);
} else {
session = Session.getInstance(properties, new JavaMailAuthenticator(smtpUser, smtpPassword));
}
Message message = new MimeMessage(session);
InternetAddress from;
List<InternetAddress> toAddrs = new ArrayList<InternetAddress>(to.length);
List<InternetAddress> ccAddrs = new ArrayList<InternetAddress>(cc.length);
try {
from = new InternetAddress(fromAddr);
message.setFrom(from);
} catch (AddressException e) {
throw new ActionExecutorException(ErrorType.ERROR, "EM002", "Bad from address specified in ${oozie.email.from.address}.", e);
} catch (MessagingException e) {
throw new ActionExecutorException(ErrorType.ERROR, "EM003", "Error setting a from address in the message.", e);
}
try {
// Add all <to>
for (String toStr : to) {
toAddrs.add(new InternetAddress(toStr.trim()));
}
message.addRecipients(RecipientType.TO, toAddrs.toArray(new InternetAddress[0]));
// Add all <cc>
for (String ccStr : cc) {
ccAddrs.add(new InternetAddress(ccStr.trim()));
}
message.addRecipients(RecipientType.CC, ccAddrs.toArray(new InternetAddress[0]));
// Set subject
message.setSubject(subject);
// when there is attachment
if (attachments != null && attachments.length > 0 && ConfigurationService.getBoolean(EMAIL_ATTACHMENT_ENABLED)) {
Multipart multipart = new MimeMultipart();
// Set body text
MimeBodyPart bodyTextPart = new MimeBodyPart();
bodyTextPart.setText(body);
multipart.addBodyPart(bodyTextPart);
for (String attachment : attachments) {
URI attachUri = new URI(attachment);
if (attachUri.getScheme() != null && attachUri.getScheme().equals("file")) {
throw new ActionExecutorException(ErrorType.ERROR, "EM008",
"Encountered an error when attaching a file. A local file cannot be attached:"
+ attachment);
}
MimeBodyPart messageBodyPart = new MimeBodyPart();
DataSource source = new URIDataSource(attachUri, user);
messageBodyPart.setDataHandler(new DataHandler(source));
messageBodyPart.setFileName(new File(attachment).getName());
multipart.addBodyPart(messageBodyPart);
}
message.setContent(multipart);
}
else {
if (attachments != null && attachments.length > 0 && !ConfigurationService.getBoolean(EMAIL_ATTACHMENT_ENABLED)) {
body = body + EMAIL_ATTACHMENT_ERROR_MSG;
}
message.setContent(body, contentType);
}
}
catch (AddressException e) {
throw new ActionExecutorException(ErrorType.ERROR, "EM004", "Bad address format in <to> or <cc>.", e);
}
catch (MessagingException e) {
throw new ActionExecutorException(ErrorType.ERROR, "EM005", "An error occured while adding recipients.", e);
}
catch (URISyntaxException e) {
throw new ActionExecutorException(ErrorType.ERROR, "EM008", "Encountered an error when attaching a file", e);
}
catch (HadoopAccessorException e) {
throw new ActionExecutorException(ErrorType.ERROR, "EM008", "Encountered an error when attaching a file", e);
}
try {
// Send over SMTP Transport
// (Session+Message has adequate details.)
Transport.send(message);
} catch (NoSuchProviderException e) {
throw new ActionExecutorException(ErrorType.ERROR, "EM006", "Could not find an SMTP transport provider to email.", e);
} catch (MessagingException e) {
throw new ActionExecutorException(ErrorType.ERROR, "EM007", "Encountered an error while sending the email message over SMTP.", e);
}
}
@Override
public void end(Context context, WorkflowAction action) throws ActionExecutorException {
String externalStatus = action.getExternalStatus();
WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
WorkflowAction.Status.ERROR;
context.setEndData(status, getActionSignal(status));
}
@Override
public void check(Context context, WorkflowAction action)
throws ActionExecutorException {
}
@Override
public void kill(Context context, WorkflowAction action)
throws ActionExecutorException {
}
@Override
public boolean isCompleted(String externalStatus) {
return true;
}
public static class JavaMailAuthenticator extends Authenticator {
String user;
String password;
public JavaMailAuthenticator(String user, String password) {
this.user = user;
this.password = password;
}
@Override
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(user, password);
}
}
class URIDataSource implements DataSource{
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
FileSystem fs;
URI uri;
public URIDataSource(URI uri, String user) throws HadoopAccessorException {
this.uri = uri;
Configuration fsConf = has.createJobConf(uri.getAuthority());
fs = has.createFileSystem(user, uri, fsConf);
}
public InputStream getInputStream() throws IOException {
return fs.open(new Path(uri));
}
public OutputStream getOutputStream() throws IOException {
return fs.create(new Path(uri));
}
public String getContentType() {
return "application/octet-stream";
}
public String getName() {
return uri.getPath();
}
}
}