changes for smtp.
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
index 302e703..1d97625 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
@@ -23,7 +23,7 @@
import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingOperator;
import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingPrerequisitesOperator;
import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
-import com.datatorrent.lib.io.SmtpOutputOperator;
+import com.datatorrent.lib.io.smtp.SmtpIdempotentOutputOperator;
import java.util.Map;
@@ -58,7 +58,7 @@
MachineInfoAveragingOperator averageOperator = dag.addOperator("AverageCalculator", MachineInfoAveragingOperator.class);
RedisKeyValPairOutputOperator<MachineKey, Map<String, String>> redisAvgOperator = dag.addOperator("Persister", new RedisKeyValPairOutputOperator<MachineKey, Map<String, String>>());
dag.addStream("Average", averageOperator.outputPort, redisAvgOperator.input);
- SmtpOutputOperator smtpOutputOperator = dag.addOperator("Alerter", new SmtpOutputOperator());
+ SmtpIdempotentOutputOperator smtpOutputOperator = dag.addOperator("Alerter", new SmtpIdempotentOutputOperator());
dag.addStream("Aggregates", prereqAverageOper.outputPort, averageOperator.inputPort);
dag.addStream("Alerts", averageOperator.smtpAlert, smtpOutputOperator.input);
return prereqAverageOper;
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithAlert.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithAlert.java
index 945bcc8..9eaa002 100644
--- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithAlert.java
+++ b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithAlert.java
@@ -22,7 +22,7 @@
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.SmtpOutputOperator;
+import com.datatorrent.lib.io.smtp.SmtpIdempotentOutputOperator;
import com.datatorrent.lib.streamquery.DerbySqlStreamOperator;
import org.apache.hadoop.conf.Configuration;
@@ -85,7 +85,7 @@
boolean isSmtp = validateSmtpParams(conf);
if (isSmtp) {
- SmtpOutputOperator mailOper = dag.addOperator("mail", getSMTPOperator(conf));
+ SmtpIdempotentOutputOperator mailOper = dag.addOperator("mail", getSMTPOperator(conf));
dag.addStream("alert_mail", alertOper.alert, mailOper.input);
}
else {
@@ -107,7 +107,7 @@
return (isNotNull(From) && isNotNull(To) && isNotNull(smtpHost) && isNotNull(smtpPort) && isNotNull(smtpUser) && isNotNull(smtpPasswd));
}
- private SmtpOutputOperator getSMTPOperator(Configuration conf)
+ private SmtpIdempotentOutputOperator getSMTPOperator(Configuration conf)
{
String From = conf.get(ApplicationWithAlert.class.getName() + ".From");
String To = conf.get(ApplicationWithAlert.class.getName() + ".To");
@@ -117,7 +117,7 @@
String smtpPasswd = conf.get(ApplicationWithAlert.class.getName() + ".smtpPasswd");
Boolean useSsl = Boolean.valueOf(conf.get(ApplicationWithAlert.class.getName() + ".useSsl"));
- SmtpOutputOperator mailOper = new SmtpOutputOperator();
+ SmtpIdempotentOutputOperator mailOper = new SmtpIdempotentOutputOperator();
mailOper.setFrom(From);
Map<String, String> recipients = Maps.newHashMap();
diff --git a/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java
deleted file mode 100644
index 19c5888..0000000
--- a/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.lib.io;
-
-import com.datatorrent.api.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Context.OperatorContext;
-
-import java.util.*;
-
-import javax.mail.*;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeMessage;
-import javax.validation.constraints.AssertTrue;
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-/**
- * This operator outputs data to an smtp server.
- * <p></p>
- * @displayName Smtp Output
- * @category Output
- * @tags stmp, output operator
- *
- * @since 0.3.2
- */
-public class SmtpOutputOperator extends BaseOperator
-{
- public enum RecipientType
- {
- TO, CC, BCC
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(SmtpOutputOperator.class);
- @NotNull
- private String subject;
- @NotNull
- private String content;
- @NotNull
- private String from;
-
- private Map<String, String> recipients = Maps.newHashMap();
-
- private int smtpPort = 587;
- @NotNull
- private String smtpHost;
- private String smtpUserName;
- private String smtpPassword;
- private String contentType = "text/plain";
- private boolean useSsl = false;
- private boolean setupCalled = false;
-
- protected transient Properties properties = System.getProperties();
- protected transient Authenticator auth;
- protected transient Session session;
- protected transient Message message;
-
- /**
- * This is the port which receives the tuples that will be output to an smtp server.
- */
- public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
- {
- @Override
- public void process(Object t)
- {
- try {
- String mailContent = content.replace("{}", t.toString());
- message.setContent(mailContent, contentType);
- LOG.info("Sending email for tuple {}", t.toString());
- Transport.send(message);
- }
- catch (MessagingException ex) {
- LOG.error("Something wrong with sending email.", ex);
- }
- }
- };
-
- public String getSubject()
- {
- return subject;
- }
-
- public void setSubject(String subject)
- {
- this.subject = subject;
- resetMessage();
- }
-
- public String getContent()
- {
- return content;
- }
-
- public void setContent(String content)
- {
- this.content = content;
- resetMessage();
- }
-
- public String getFrom()
- {
- return from;
- }
-
- public void setFrom(String from)
- {
- this.from = from;
- resetMessage();
- }
-
- public int getSmtpPort()
- {
- return smtpPort;
- }
-
- public void setSmtpPort(int smtpPort)
- {
- this.smtpPort = smtpPort;
- reset();
- }
-
- public String getSmtpHost()
- {
- return smtpHost;
- }
-
- public void setSmtpHost(String smtpHost)
- {
- this.smtpHost = smtpHost;
- reset();
- }
-
- public String getSmtpUserName()
- {
- return smtpUserName;
- }
-
- public void setSmtpUserName(String smtpUserName)
- {
- this.smtpUserName = smtpUserName;
- reset();
- }
-
- public String getSmtpPassword()
- {
- return smtpPassword;
- }
-
- public void setSmtpPassword(String smtpPassword)
- {
- this.smtpPassword = smtpPassword;
- reset();
- }
-
- public String getContentType()
- {
- return contentType;
- }
-
- public void setContentType(String contentType)
- {
- this.contentType = contentType;
- resetMessage();
- }
-
- public boolean isUseSsl()
- {
- return useSsl;
- }
-
- public void setUseSsl(boolean useSsl)
- {
- this.useSsl = useSsl;
- reset();
- }
-
- @Override
- public void setup(OperatorContext context)
- {
- setupCalled = true;
- reset();
- }
-
- private void reset()
- {
- if (!setupCalled) {
- return;
- }
- if (!StringUtils.isBlank(smtpPassword)) {
- properties.setProperty("mail.smtp.auth", "true");
- properties.setProperty("mail.smtp.starttls.enable", "true");
- if (useSsl) {
- properties.setProperty("mail.smtp.socketFactory.port", String.valueOf(smtpPort));
- properties.setProperty("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
- properties.setProperty("mail.smtp.socketFactory.fallback", "false");
- }
-
- auth = new Authenticator()
- {
- @Override
- protected PasswordAuthentication getPasswordAuthentication()
- {
- return new PasswordAuthentication(smtpUserName, smtpPassword);
- }
-
- };
- }
-
- properties.setProperty("mail.smtp.host", smtpHost);
- properties.setProperty("mail.smtp.port", String.valueOf(smtpPort));
- session = Session.getInstance(properties, auth);
- resetMessage();
- }
-
- private void resetMessage()
- {
- if (!setupCalled) {
- return;
- }
- try {
- message = new MimeMessage(session);
- message.setFrom(new InternetAddress(from));
- for (Map.Entry<String, String> entry : recipients.entrySet()) {
- RecipientType type = RecipientType.valueOf(entry.getKey().toUpperCase());
- Message.RecipientType recipientType;
- switch (type) {
- case TO:
- recipientType = Message.RecipientType.TO;
- break;
- case CC:
- recipientType = Message.RecipientType.CC;
- break;
- case BCC:
- default:
- recipientType = Message.RecipientType.BCC;
- break;
- }
- String[] addresses = entry.getValue().split(",");
- for (String address : addresses) {
- message.addRecipient(recipientType, new InternetAddress(address));
- }
- }
- message.setSubject(subject);
- LOG.debug("all recipients {}", Arrays.toString(message.getAllRecipients()));
- }
- catch (MessagingException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- public Map<String, String> getRecipients()
- {
- return recipients;
- }
-
- /**
- * @param recipients : map from recipient type to coma separated list of addresses for e.g. to->abc@xyz.com,def@xyz.com
- */
- public void setRecipients(Map<String, String> recipients)
- {
- this.recipients = recipients;
- resetMessage();
- }
-
- @AssertTrue(message = "Please verify the recipients set")
- private boolean isValid()
- {
- if (recipients.isEmpty()) {
- return false;
- }
- for (Map.Entry<String, String> entry : recipients.entrySet()) {
- if (entry.getKey().toUpperCase().equalsIgnoreCase(RecipientType.TO.toString())) {
- if (entry.getValue() != null && entry.getValue().length() > 0) {
- return true;
- }
- return false;
- }
- }
- return false;
- }
-}
diff --git a/library/src/main/java/com/datatorrent/lib/io/smtp/SmtpIdempotentOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/smtp/SmtpIdempotentOutputOperator.java
new file mode 100644
index 0000000..04df10b
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/smtp/SmtpIdempotentOutputOperator.java
@@ -0,0 +1,548 @@
+ /*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.smtp;
+
+import com.datatorrent.api.BaseOperator;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Operator;
+import com.datatorrent.common.util.DTThrowable;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+
+import java.util.*;
+
+import javax.mail.*;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeMessage;
+import javax.validation.constraints.AssertTrue;
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This operator outputs data to an smtp server.
+ * <p>
+ * </p>
+ * @displayName Smtp Output
+ * @category Output
+ * @tags stmp, output operator
+ *
+ * @since 0.3.2
+ */
+public class SmtpIdempotentOutputOperator extends BaseOperator implements Operator.CheckpointListener, Operator.IdleTimeHandler
+{
+ public enum RecipientType
+ {
+ TO, CC, BCC
+ }
+
+ @NotNull
+ private String subject;
+ @NotNull
+ private String content;
+ @NotNull
+ private String from;
+
+ private Map<String, String> recipients = Maps.newHashMap();
+
+ private int smtpPort = 689;
+ private transient long sleepTimeMillis;
+ @NotNull
+ private String smtpHost;
+ private String smtpUserName;
+ private String smtpPassword;
+ private String contentType = "text/plain";
+ private boolean useSsl = false;
+ private boolean setupCalled = false;
+ private transient Queue<String> messagesSent;
+ private transient SMTPSenderThread smtpSenderThread;
+ protected transient Properties properties = System.getProperties();
+ protected transient Authenticator auth;
+ protected transient Session session;
+ protected transient Message message;
+ private final transient AtomicReference<Throwable> throwable;
+ protected IdempotentStorageManager idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+ protected Queue<String> waiting;
+ protected int countMessageSent;
+ private transient int countMessagesToBeSkipped;
+
+ private transient long currentWindowId;
+ private transient int operatorId;
+ private transient long largestRecoveryWindow;
+ private transient int countOfTuples;
+ private transient int numWaiting;
+ private transient final Object sync;
+
+ public SmtpIdempotentOutputOperator()
+ {
+ throwable = new AtomicReference<Throwable>();
+ waiting = new LinkedList<String>();
+ sync = new Object();
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ setupCalled = true;
+ operatorId = context.getId();
+ idempotentStorageManager.setup(context);
+ sleepTimeMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+ smtpSenderThread = new SMTPSenderThread();
+ messagesSent = new LinkedList<String>();
+ reset();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ countOfTuples = 0;
+ largestRecoveryWindow = idempotentStorageManager.getLargestRecoveryWindow();
+ currentWindowId = windowId;
+ if (windowId <= largestRecoveryWindow) {
+ numWaiting = waiting.size();
+ countMessagesToBeSkipped = restore(windowId);
+ if (countMessagesToBeSkipped > numWaiting) {
+ // These messages were already sent in this window, so remove them from waiting.numWaiting could be size of waiting from previous window.
+ for (int i = 0; i < numWaiting; i++) {
+ waiting.poll();
+ }
+ // This diff could be number of tuples coming in this window, check in processTuple.
+ countMessagesToBeSkipped = countMessagesToBeSkipped - numWaiting;
+
+ }
+ else if (countMessagesToBeSkipped <= numWaiting) //It is possible that rest of waiting messages were sent in next window.
+ {
+ for (int i = 0; i < countMessagesToBeSkipped; i++) {
+ waiting.poll();
+ }
+ countMessagesToBeSkipped = 0;
+ }
+ }
+
+ }
+
+ protected void completed(String sentMessage)
+ {
+ messagesSent.add(sentMessage);
+ }
+
+ @Override
+ public void teardown()
+ {
+ idempotentStorageManager.teardown();
+
+ try {
+ smtpSenderThread.stopService();
+ }
+ catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ super.teardown();
+
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ try {
+ idempotentStorageManager.deleteUpTo(operatorId, windowId);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * This is the port which receives the tuples that will be output to an smtp server.
+ */
+ public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object t)
+ {
+ String mailContent = content.replace("{}", t.toString());
+ // Recovery case
+ if (currentWindowId <= largestRecoveryWindow) {
+ countOfTuples++;
+ /*There could be more duplicate messages coming in this window, we are not sure of state in waiting queue yet,
+ hence add these to waiting queue. If they are already sent, then they will be removed in next or further
+ windows according to logic implemented in beginwindow.*/
+ if (countOfTuples > countMessagesToBeSkipped) {
+ waiting.add(mailContent);
+ }
+ else {
+ return;
+ }
+ }
+ else {
+ waiting.add(mailContent);
+ smtpSenderThread.messageRcvdQueue.add(mailContent);
+ }
+ }
+
+ };
+
+ @Override
+ public void handleIdleTime()
+ {
+ if (messagesSent.isEmpty() && throwable.get() == null) {
+ try {
+ Thread.sleep(sleepTimeMillis);
+ }
+ catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ else if (throwable.get() != null) {
+ DTThrowable.rethrow(throwable.get());
+ }
+ else {
+ /**
+ * Remove all the messages from waiting which have been sent.
+ */
+ if (!waiting.isEmpty()) {
+ while (!messagesSent.isEmpty()) {
+ waiting.remove(messagesSent.poll());
+ }
+ }
+ }
+ }
+
+ public String getSubject()
+ {
+ return subject;
+ }
+
+ public void setSubject(String subject)
+ {
+ this.subject = subject;
+ resetMessage();
+ }
+
+ public String getContent()
+ {
+ return content;
+ }
+
+ public void setContent(String content)
+ {
+ this.content = content;
+ resetMessage();
+ }
+
+ public String getFrom()
+ {
+ return from;
+ }
+
+ public void setFrom(String from)
+ {
+ this.from = from;
+ resetMessage();
+ }
+
+ public int getSmtpPort()
+ {
+ return smtpPort;
+ }
+
+ public void setSmtpPort(int smtpPort)
+ {
+ this.smtpPort = smtpPort;
+ reset();
+ }
+
+ public String getSmtpHost()
+ {
+ return smtpHost;
+ }
+
+ public void setSmtpHost(String smtpHost)
+ {
+ this.smtpHost = smtpHost;
+ reset();
+ }
+
+ public String getSmtpUserName()
+ {
+ return smtpUserName;
+ }
+
+ public void setSmtpUserName(String smtpUserName)
+ {
+ this.smtpUserName = smtpUserName;
+ reset();
+ }
+
+ public String getSmtpPassword()
+ {
+ return smtpPassword;
+ }
+
+ public void setSmtpPassword(String smtpPassword)
+ {
+ this.smtpPassword = smtpPassword;
+ reset();
+ }
+
+ public String getContentType()
+ {
+ return contentType;
+ }
+
+ public void setContentType(String contentType)
+ {
+ this.contentType = contentType;
+ resetMessage();
+ }
+
+ public boolean isUseSsl()
+ {
+ return useSsl;
+ }
+
+ public void setUseSsl(boolean useSsl)
+ {
+ this.useSsl = useSsl;
+ reset();
+ }
+
+ private void reset()
+ {
+ if (!setupCalled) {
+ return;
+ }
+ if (!StringUtils.isBlank(smtpPassword)) {
+ properties.setProperty("mail.smtp.auth", "true");
+ properties.setProperty("mail.smtp.starttls.enable", "true");
+ if (useSsl) {
+ properties.setProperty("mail.smtp.socketFactory.port", String.valueOf(smtpPort));
+ properties.setProperty("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
+ properties.setProperty("mail.smtp.socketFactory.fallback", "false");
+ }
+
+ auth = new Authenticator()
+ {
+ @Override
+ protected PasswordAuthentication getPasswordAuthentication()
+ {
+ return new PasswordAuthentication(smtpUserName, smtpPassword);
+ }
+
+ };
+ }
+
+ properties.setProperty("mail.smtp.host", smtpHost);
+ properties.setProperty("mail.smtp.port", String.valueOf(smtpPort));
+ session = Session.getInstance(properties, auth);
+ resetMessage();
+ }
+
+ private void resetMessage()
+ {
+ if (!setupCalled) {
+ return;
+ }
+ try {
+ message = new MimeMessage(session);
+ message.setFrom(new InternetAddress(from));
+ for (Map.Entry<String, String> entry: recipients.entrySet()) {
+ RecipientType type = RecipientType.valueOf(entry.getKey().toUpperCase());
+ Message.RecipientType recipientType;
+ switch (type) {
+ case TO:
+ recipientType = Message.RecipientType.TO;
+ break;
+ case CC:
+ recipientType = Message.RecipientType.CC;
+ break;
+ case BCC:
+ default:
+ recipientType = Message.RecipientType.BCC;
+ break;
+ }
+ String[] addresses = entry.getValue().split(",");
+ for (String address: addresses) {
+ message.addRecipient(recipientType, new InternetAddress(address));
+ }
+ }
+ message.setSubject(subject);
+ LOG.debug("all recipients {}", Arrays.toString(message.getAllRecipients()));
+ }
+ catch (MessagingException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public Map<String, String> getRecipients()
+ {
+ return recipients;
+ }
+
+ /**
+ * @param recipients : map from recipient type to coma separated list of addresses for e.g. to->abc@xyz.com,def@xyz.com
+ */
+ public void setRecipients(Map<String, String> recipients)
+ {
+ this.recipients = recipients;
+ resetMessage();
+ }
+
+ @AssertTrue(message = "Please verify the recipients set")
+ private boolean isValid()
+ {
+ if (recipients.isEmpty()) {
+ return false;
+ }
+ for (Map.Entry<String, String> entry: recipients.entrySet()) {
+ if (entry.getKey().toUpperCase().equalsIgnoreCase(RecipientType.TO.toString())) {
+ if (entry.getValue() != null && entry.getValue().length() > 0) {
+ return true;
+ }
+ return false;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (currentWindowId == largestRecoveryWindow) {
+ if (!waiting.isEmpty()) {
+ smtpSenderThread.messageRcvdQueue.addAll(waiting);
+ }
+ }
+ else if (currentWindowId > largestRecoveryWindow) {
+ try {
+ synchronized (sync) {
+ idempotentStorageManager.save(countMessageSent, operatorId, currentWindowId);
+ LOG.debug("idempotent manager saves count {}", countMessageSent);
+ countMessageSent = 0;
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException("saving recovery", e);
+ }
+
+ }
+ if (!waiting.isEmpty()) {
+ while (!messagesSent.isEmpty()) {
+ waiting.remove(messagesSent.poll());
+ }
+ }
+ }
+
+ protected int restore(long windowId)
+ {
+ try {
+ Map<Integer, Object> recoveryDataPerOperator = idempotentStorageManager.load(windowId);
+ for (Object recovery: recoveryDataPerOperator.values()) {
+ countMessagesToBeSkipped = (Integer)recovery;
+ }
+ }
+ catch (IOException ex) {
+ throw new RuntimeException("replay", ex);
+ }
+ return countMessagesToBeSkipped;
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ /**
+ * Sets the idempotent storage manager on the operator.
+ *
+ * @param idempotentStorageManager an {@link IdempotentStorageManager}
+ */
+ public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager)
+ {
+ this.idempotentStorageManager = idempotentStorageManager;
+ }
+
+ /**
+ * Returns the idempotent storage manager which is being used by the operator.
+ *
+ * @return the idempotent storage manager.
+ */
+ public IdempotentStorageManager getIdempotentStorageManager()
+ {
+ return idempotentStorageManager;
+ }
+
+ private class SMTPSenderThread implements Runnable
+ {
+ private transient final BlockingQueue<String> messageRcvdQueue;
+ private transient volatile boolean running;
+
+ SMTPSenderThread()
+ {
+ messageRcvdQueue = new LinkedBlockingQueue<String>();
+ Thread messageServiceThread = new Thread(this, "SMTPSenderThread");
+ messageServiceThread.start();
+ }
+
+ private void stopService() throws IOException
+ {
+ running = false;
+ }
+
+ @Override
+ public void run()
+ {
+ running = true;
+ while (running) {
+ String mailContent = messageRcvdQueue.poll();
+ if (mailContent != null) {
+ try {
+ message.setContent(mailContent, contentType);
+ synchronized (sync) {
+ Transport.send(message);
+ }
+ }
+ catch (MessagingException ex) {
+ running = false;
+ throwable.set(ex);
+ }
+ synchronized (sync) {
+ countMessageSent++;
+ }
+
+ LOG.debug("message is {}", message);
+ completed(mailContent);
+
+ }
+ }
+
+ }
+
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(SmtpIdempotentOutputOperator.class);
+
+}
diff --git a/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java
deleted file mode 100644
index dd8873c..0000000
--- a/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.lib.io;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.mail.Message;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeMessage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
-import com.google.common.collect.Maps;
-import com.icegreen.greenmail.util.GreenMail;
-import com.icegreen.greenmail.util.ServerSetup;
-import com.icegreen.greenmail.util.ServerSetupTest;
-
-public class SmtpOutputOperatorTest
-{
-
- String subject = "ALERT!";
- String content = "This is an SMTP operator test {}";
- String from = "jenkins@datatorrent.com";
- String to = "jenkins@datatorrent.com";
- String cc = "jenkinsCC@datatorrent.com";
- GreenMail greenMail = null;
- SmtpOutputOperator node;
- Map<String, String> data;
-
- @Before
- public void setup() throws Exception
- {
- node = new SmtpOutputOperator();
- greenMail = new GreenMail(ServerSetupTest.ALL);
- greenMail.start();
- node.setFrom(from);
- node.setContent(content);
- node.setSmtpHost("127.0.0.1");
- node.setSmtpPort(ServerSetupTest.getPortOffset() + ServerSetup.SMTP.getPort());
- node.setSmtpUserName(from);
- node.setSmtpPassword("<password>");
- //node.setUseSsl(true);
- node.setSubject(subject);
- data = new HashMap<String, String>();
- data.put("alertkey", "alertvalue");
-
- }
-
- @After
- public void teardown() throws Exception
- {
- node.teardown();
- greenMail.stop();
- Thread.sleep(1000);
- }
-
- @Test
- public void testSmtpOutputNode() throws Exception
- {
- Map<String, String> recipients = Maps.newHashMap();
- recipients.put("to", to + "," + cc);
- recipients.put("cc", cc);
- node.setRecipients(recipients);
- node.setup(null);
- node.beginWindow(1000);
- node.input.process(data);
- node.endWindow();
- Assert.assertTrue(greenMail.waitForIncomingEmail(5000, 1));
- MimeMessage[] messages = greenMail.getReceivedMessages();
- Assert.assertEquals(3, messages.length);
- String receivedContent = messages[0].getContent().toString().trim();
- String expectedContent = content.replace("{}", data.toString()).trim();
-
- Assert.assertTrue(expectedContent.equals(receivedContent));
- Assert.assertEquals(from, ((InternetAddress) messages[0].getFrom()[0]).getAddress());
- Assert.assertEquals(to, messages[0].getRecipients(Message.RecipientType.TO)[0].toString());
- Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.TO)[1].toString());
- Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.CC)[0].toString());
-
- }
-
- @Test
- public void test() throws Exception
- {
- Map<String, String> recipients = Maps.newHashMap();
- recipients.put("to", to);
- node.setRecipients(recipients);
- node.setup(null);
- node.beginWindow(1000);
- node.input.process(data);
- node.endWindow();
- Assert.assertTrue(greenMail.waitForIncomingEmail(5000, 1));
- MimeMessage[] messages = greenMail.getReceivedMessages();
- Assert.assertEquals(1, messages.length);
- String receivedContent = messages[0].getContent().toString().trim();
- String expectedContent = content.replace("{}", data.toString()).trim();
-
- Assert.assertTrue(expectedContent.equals(receivedContent));
- Assert.assertEquals(from, ((InternetAddress) messages[0].getFrom()[0]).getAddress());
- Assert.assertEquals(to, messages[0].getAllRecipients()[0].toString());
- }
-
- @Test
- public void testProperties() throws Exception
- {
- Configuration conf = new Configuration(false);
- conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.subject", subject);
- conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.content", content);
- conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.from", from);
- conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.smtpHost", "127.0.0.1");
- conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.smtpUserName", from);
- conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.smtpPassword", "<password>");
- conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.recipients.TO", to + "," + cc);
- conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.recipients.CC", cc);
-
- final AtomicReference<SmtpOutputOperator> o1 = new AtomicReference<SmtpOutputOperator>();
- StreamingApplication app = new StreamingApplication() {
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- o1.set(dag.addOperator("o1", new SmtpOutputOperator()));
- }
- };
-
- LocalMode lma = LocalMode.newInstance();
- lma.prepareDAG(app, conf);
- Assert.assertEquals("checking TO list", to + "," + cc, o1.get().getRecipients().get("TO"));
- Assert.assertEquals("checking CC list", cc, o1.get().getRecipients().get("CC"));
-
- }
-}
diff --git a/library/src/test/java/com/datatorrent/lib/io/smtp/SmtpOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/smtp/SmtpOutputOperatorTest.java
new file mode 100644
index 0000000..f00fce2
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/smtp/SmtpOutputOperatorTest.java
@@ -0,0 +1,295 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.smtp;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.mail.Message;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeMessage;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.collect.Maps;
+import com.icegreen.greenmail.util.GreenMail;
+import com.icegreen.greenmail.util.ServerSetup;
+import com.icegreen.greenmail.util.ServerSetupTest;
+
+import org.junit.*;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.util.TestUtils;
+
+import com.datatorrent.api.*;
+import java.util.logging.Level;
+
+public class SmtpOutputOperatorTest
+{
+
+ String subject = "ALERT!";
+ String content = "This is an SMTP operator test {}";
+ String from = "jenkins@datatorrent.com";
+ String to = "jenkins@datatorrent.com";
+ String cc = "jenkinsCC@datatorrent.com";
+ GreenMail greenMail = null;
+ SmtpIdempotentOutputOperator node;
+
+ Map<String, String> data;
+
+ public class TestMeta extends TestWatcher
+ {
+ public String dir = null;
+ Context.OperatorContext context;
+ IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager();
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+ String methodName = description.getMethodName();
+ String className = description.getClassName();
+ this.dir = "target/" + className + "/" + methodName;
+ Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.DAGContext.APPLICATION_ID, "FileInputOperatorTest");
+ context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+ node = new SmtpIdempotentOutputOperator();
+ greenMail = new GreenMail(ServerSetupTest.ALL);
+ greenMail.start();
+ node.setFrom(from);
+ node.setContent(content);
+ node.setSmtpHost("127.0.0.1");
+ node.setSmtpPort(ServerSetupTest.getPortOffset() + ServerSetup.SMTP.getPort());
+ node.setSmtpUserName(from);
+ node.setSmtpPassword("<password>");
+ data = new HashMap<String, String>();
+ data.put("alertkey", "alertvalue");
+
+ manager.setRecoveryPath(testMeta.dir + "/recovery");
+ manager.setup(testMeta.context);
+ node.setIdempotentStorageManager(manager);
+
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ super.finished(description);
+ FileUtils.deleteQuietly(new File(this.dir));
+ greenMail.stop();
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+
+ @Test
+ public void testSmtpOutputNode() throws Exception
+ {
+ Map<String, String> recipients = Maps.newHashMap();
+ recipients.put("to", to + "," + cc);
+ recipients.put("cc", cc);
+ node.setRecipients(recipients);
+ node.setSubject("hello");
+ node.setup(testMeta.context);
+ node.beginWindow(0);
+ String data = "First test message";
+ node.input.process(data);
+ Thread.sleep(5000);
+ node.endWindow();
+ node.teardown();
+ Thread.sleep(5000);
+ Assert.assertTrue(greenMail.waitForIncomingEmail(5000, 1));
+ MimeMessage[] messages = greenMail.getReceivedMessages();
+ Assert.assertEquals(3, messages.length);
+ String receivedContent = messages[0].getContent().toString().trim();
+ String expectedContent = content.replace("{}", data.toString()).trim();
+
+ Assert.assertTrue(expectedContent.equals(receivedContent));
+ Assert.assertEquals(from, ((InternetAddress)messages[0].getFrom()[0]).getAddress());
+ Assert.assertEquals(to, messages[0].getRecipients(Message.RecipientType.TO)[0].toString());
+ Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.TO)[1].toString());
+ Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.CC)[0].toString());
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ Map<String, String> recipients = Maps.newHashMap();
+ recipients.put("to", to);
+ node.setRecipients(recipients);
+ node.setup(testMeta.context);
+ node.beginWindow(0);
+ String data = "First test message";
+ node.input.process(data);
+ node.endWindow();
+ node.teardown();
+ Thread.sleep(500);
+ Assert.assertTrue(greenMail.waitForIncomingEmail(5000, 1));
+ MimeMessage[] messages = greenMail.getReceivedMessages();
+ Assert.assertEquals(1, messages.length);
+ String receivedContent = messages[0].getContent().toString().trim();
+ String expectedContent = content.replace("{}", data.toString()).trim();
+
+ Assert.assertTrue(expectedContent.equals(receivedContent));
+ Assert.assertEquals(from, ((InternetAddress)messages[0].getFrom()[0]).getAddress());
+ Assert.assertEquals(to, messages[0].getAllRecipients()[0].toString());
+ }
+
+ @Test
+ public void testProperties() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+ conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.subject", subject);
+ conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.content", content);
+ conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.from", from);
+ conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.smtpHost", "127.0.0.1");
+ conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.smtpUserName", from);
+ conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.smtpPassword", "<password>");
+ conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.recipients.TO", to + "," + cc);
+ conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.recipients.CC", cc);
+
+ final AtomicReference<SmtpIdempotentOutputOperator> o1 = new AtomicReference<SmtpIdempotentOutputOperator>();
+ StreamingApplication app = new StreamingApplication()
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ o1.set(dag.addOperator("o1", new SmtpIdempotentOutputOperator()));
+ }
+
+ };
+
+ LocalMode lma = LocalMode.newInstance();
+ lma.prepareDAG(app, conf);
+ Assert.assertEquals("checking TO list", to + "," + cc, o1.get().getRecipients().get("TO"));
+ Assert.assertEquals("checking CC list", cc, o1.get().getRecipients().get("CC"));
+ }
+
+ @Test
+ public void testIdempotency() throws Exception
+ {
+ Map<String, String> recipients = Maps.newHashMap();
+
+ recipients.put("to", to);
+ node.setup(testMeta.context);
+ node.setRecipients(recipients);
+ node.setSubject("hello");
+ for (long wid = 1; wid < 10; wid++) {
+ node.beginWindow(wid);
+ String input = wid + "test message";
+ node.input.process(input);
+ if(wid == 5 || wid == 6 || wid == 7){
+ input = wid +"hello"+"test message";
+ node.input.process(input);
+ }
+ node.endWindow();
+ Thread.sleep(10000);
+ if (wid == 7) {
+ SmtpIdempotentOutputOperator newOp = TestUtils.clone(new Kryo(), node);
+ node.teardown();
+ newOp.setup(testMeta.context);
+ newOp.beginWindow(5);
+ String inputTest = 5 + "test message";
+ newOp.input.process(inputTest);
+ inputTest = 5 +"hello"+"test message";
+ newOp.endWindow();
+ Thread.sleep(5000);
+
+ newOp.beginWindow(6);
+ inputTest = 6 + "test message";
+ newOp.input.process(inputTest);
+ inputTest = 6 +"hello"+"test message";
+ newOp.input.process(inputTest);
+ newOp.endWindow();
+ Thread.sleep(5000);
+
+ newOp.beginWindow(7);
+ inputTest = 7 + "test message";
+ newOp.input.process(inputTest);
+ inputTest = 7 +"hello"+"test message";
+ newOp.input.process(inputTest);
+ newOp.endWindow();
+ Thread.sleep(5000);
+ //normal processing
+ newOp.beginWindow(8);
+ inputTest = 8 + "test message";
+ newOp.input.process(inputTest);
+ newOp.endWindow();
+ Thread.sleep(10000);
+ newOp.teardown();
+ break;
+ }
+ }
+ Assert.assertTrue(greenMail.waitForIncomingEmail(5000, 1));
+
+ MimeMessage[] messages = greenMail.getReceivedMessages();
+ Assert.assertEquals(11, messages.length);
+ for (int i = 0; i < 5; i++) {
+ String receivedContent = messages[i].getContent().toString().trim();
+ logger.debug("received content is {}", receivedContent);
+ String expectedContent = content.replace("{}", i+1 + "test message").trim();
+ Assert.assertTrue(expectedContent.equals(receivedContent));
+ }
+
+ String receivedContent = messages[5].getContent().toString().trim();
+ logger.debug("received content is {}", receivedContent);
+ String expectedContent = content.replace("{}", 5 + "hellotest message").trim();
+ Assert.assertTrue(expectedContent.equals(receivedContent));
+ receivedContent = messages[6].getContent().toString().trim();
+ logger.debug("received content is {}", receivedContent);
+ expectedContent = content.replace("{}", 6 + "test message").trim();
+ Assert.assertTrue(expectedContent.equals(receivedContent));
+ receivedContent = messages[7].getContent().toString().trim();
+ logger.debug("received content is {}", receivedContent);
+ expectedContent = content.replace("{}", 6 + "hellotest message").trim();
+ Assert.assertTrue(expectedContent.equals(receivedContent));
+ receivedContent = messages[8].getContent().toString().trim();
+ logger.debug("received content is {}", receivedContent);
+ expectedContent = content.replace("{}", 7 + "test message").trim();
+ Assert.assertTrue(expectedContent.equals(receivedContent));
+ receivedContent = messages[9].getContent().toString().trim();
+ logger.debug("received content is {}", receivedContent);
+ expectedContent = content.replace("{}", 7 + "hellotest message").trim();
+ Assert.assertTrue(expectedContent.equals(receivedContent));
+ receivedContent = messages[10].getContent().toString().trim();
+ logger.debug("received content is {}", receivedContent);
+ expectedContent = content.replace("{}", 8 + "test message").trim();
+ Assert.assertTrue(expectedContent.equals(receivedContent));
+
+ Assert.assertEquals(from, ((InternetAddress)messages[0].getFrom()[0]).getAddress());
+ Assert.assertEquals(to, messages[0].getRecipients(Message.RecipientType.TO)[0].toString());
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(SmtpOutputOperatorTest.class);
+}
\ No newline at end of file