changes for smtp.
diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
index 302e703..1d97625 100644
--- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
+++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/Application.java
@@ -23,7 +23,7 @@
 import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingOperator;
 import com.datatorrent.demos.machinedata.operator.MachineInfoAveragingPrerequisitesOperator;
 import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
-import com.datatorrent.lib.io.SmtpOutputOperator;
+import com.datatorrent.lib.io.smtp.SmtpIdempotentOutputOperator;
 
 import java.util.Map;
 
@@ -58,7 +58,7 @@
     MachineInfoAveragingOperator averageOperator = dag.addOperator("AverageCalculator", MachineInfoAveragingOperator.class);
     RedisKeyValPairOutputOperator<MachineKey, Map<String, String>> redisAvgOperator = dag.addOperator("Persister", new RedisKeyValPairOutputOperator<MachineKey, Map<String, String>>());
     dag.addStream("Average", averageOperator.outputPort, redisAvgOperator.input);
-    SmtpOutputOperator smtpOutputOperator = dag.addOperator("Alerter", new SmtpOutputOperator());
+    SmtpIdempotentOutputOperator smtpOutputOperator = dag.addOperator("Alerter", new SmtpIdempotentOutputOperator());
     dag.addStream("Aggregates", prereqAverageOper.outputPort, averageOperator.inputPort);
     dag.addStream("Alerts", averageOperator.smtpAlert, smtpOutputOperator.input);
     return prereqAverageOper;
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithAlert.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithAlert.java
index 945bcc8..9eaa002 100644
--- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithAlert.java
+++ b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithAlert.java
@@ -22,7 +22,7 @@
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 
 import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.SmtpOutputOperator;
+import com.datatorrent.lib.io.smtp.SmtpIdempotentOutputOperator;
 import com.datatorrent.lib.streamquery.DerbySqlStreamOperator;
 
 import org.apache.hadoop.conf.Configuration;
@@ -85,7 +85,7 @@
     boolean isSmtp = validateSmtpParams(conf);
 
     if (isSmtp) {
-      SmtpOutputOperator mailOper = dag.addOperator("mail", getSMTPOperator(conf));
+      SmtpIdempotentOutputOperator mailOper = dag.addOperator("mail", getSMTPOperator(conf));
       dag.addStream("alert_mail", alertOper.alert, mailOper.input);
     }
     else {
@@ -107,7 +107,7 @@
     return (isNotNull(From) && isNotNull(To) && isNotNull(smtpHost) && isNotNull(smtpPort) && isNotNull(smtpUser) && isNotNull(smtpPasswd));
   }
 
-  private SmtpOutputOperator getSMTPOperator(Configuration conf)
+  private SmtpIdempotentOutputOperator getSMTPOperator(Configuration conf)
   {
     String From = conf.get(ApplicationWithAlert.class.getName() + ".From");
     String To = conf.get(ApplicationWithAlert.class.getName() + ".To");
@@ -117,7 +117,7 @@
     String smtpPasswd = conf.get(ApplicationWithAlert.class.getName() + ".smtpPasswd");
     Boolean useSsl = Boolean.valueOf(conf.get(ApplicationWithAlert.class.getName() + ".useSsl"));
 
-    SmtpOutputOperator mailOper = new SmtpOutputOperator();
+    SmtpIdempotentOutputOperator mailOper = new SmtpIdempotentOutputOperator();
 
     mailOper.setFrom(From);
     Map<String, String> recipients = Maps.newHashMap();
diff --git a/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java
deleted file mode 100644
index 19c5888..0000000
--- a/library/src/main/java/com/datatorrent/lib/io/SmtpOutputOperator.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.lib.io;
-
-import com.datatorrent.api.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Context.OperatorContext;
-
-import java.util.*;
-
-import javax.mail.*;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeMessage;
-import javax.validation.constraints.AssertTrue;
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-/**
- * This operator outputs data to an smtp server.
- * <p></p>
- * @displayName Smtp Output
- * @category Output
- * @tags stmp, output operator
- *
- * @since 0.3.2
- */
-public class SmtpOutputOperator extends BaseOperator
-{
-  public enum RecipientType
-  {
-    TO, CC, BCC
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(SmtpOutputOperator.class);
-  @NotNull
-  private String subject;
-  @NotNull
-  private String content;
-  @NotNull
-  private String from;
-
-  private Map<String, String> recipients = Maps.newHashMap();
-
-  private int smtpPort = 587;
-  @NotNull
-  private String smtpHost;
-  private String smtpUserName;
-  private String smtpPassword;
-  private String contentType = "text/plain";
-  private boolean useSsl = false;
-  private boolean setupCalled = false;
-
-  protected transient Properties properties = System.getProperties();
-  protected transient Authenticator auth;
-  protected transient Session session;
-  protected transient Message message;
-
-  /**
-   * This is the port which receives the tuples that will be output to an smtp server.
-   */
-  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
-  {
-    @Override
-    public void process(Object t)
-    {
-      try {
-        String mailContent = content.replace("{}", t.toString());
-        message.setContent(mailContent, contentType);
-        LOG.info("Sending email for tuple {}", t.toString());
-        Transport.send(message);
-      }
-      catch (MessagingException ex) {
-        LOG.error("Something wrong with sending email.", ex);
-      }
-    }
-  };
-
-  public String getSubject()
-  {
-    return subject;
-  }
-
-  public void setSubject(String subject)
-  {
-    this.subject = subject;
-    resetMessage();
-  }
-
-  public String getContent()
-  {
-    return content;
-  }
-
-  public void setContent(String content)
-  {
-    this.content = content;
-    resetMessage();
-  }
-
-  public String getFrom()
-  {
-    return from;
-  }
-
-  public void setFrom(String from)
-  {
-    this.from = from;
-    resetMessage();
-  }
-
-  public int getSmtpPort()
-  {
-    return smtpPort;
-  }
-
-  public void setSmtpPort(int smtpPort)
-  {
-    this.smtpPort = smtpPort;
-    reset();
-  }
-
-  public String getSmtpHost()
-  {
-    return smtpHost;
-  }
-
-  public void setSmtpHost(String smtpHost)
-  {
-    this.smtpHost = smtpHost;
-    reset();
-  }
-
-  public String getSmtpUserName()
-  {
-    return smtpUserName;
-  }
-
-  public void setSmtpUserName(String smtpUserName)
-  {
-    this.smtpUserName = smtpUserName;
-    reset();
-  }
-
-  public String getSmtpPassword()
-  {
-    return smtpPassword;
-  }
-
-  public void setSmtpPassword(String smtpPassword)
-  {
-    this.smtpPassword = smtpPassword;
-    reset();
-  }
-
-  public String getContentType()
-  {
-    return contentType;
-  }
-
-  public void setContentType(String contentType)
-  {
-    this.contentType = contentType;
-    resetMessage();
-  }
-
-  public boolean isUseSsl()
-  {
-    return useSsl;
-  }
-
-  public void setUseSsl(boolean useSsl)
-  {
-    this.useSsl = useSsl;
-    reset();
-  }
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    setupCalled = true;
-    reset();
-  }
-
-  private void reset()
-  {
-    if (!setupCalled) {
-      return;
-    }
-    if (!StringUtils.isBlank(smtpPassword)) {
-      properties.setProperty("mail.smtp.auth", "true");
-      properties.setProperty("mail.smtp.starttls.enable", "true");
-      if (useSsl) {
-        properties.setProperty("mail.smtp.socketFactory.port", String.valueOf(smtpPort));
-        properties.setProperty("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
-        properties.setProperty("mail.smtp.socketFactory.fallback", "false");
-      }
-
-      auth = new Authenticator()
-      {
-        @Override
-        protected PasswordAuthentication getPasswordAuthentication()
-        {
-          return new PasswordAuthentication(smtpUserName, smtpPassword);
-        }
-
-      };
-    }
-
-    properties.setProperty("mail.smtp.host", smtpHost);
-    properties.setProperty("mail.smtp.port", String.valueOf(smtpPort));
-    session = Session.getInstance(properties, auth);
-    resetMessage();
-  }
-
-  private void resetMessage()
-  {
-    if (!setupCalled) {
-      return;
-    }
-    try {
-      message = new MimeMessage(session);
-      message.setFrom(new InternetAddress(from));
-      for (Map.Entry<String, String> entry : recipients.entrySet()) {
-        RecipientType type = RecipientType.valueOf(entry.getKey().toUpperCase());
-        Message.RecipientType recipientType;
-        switch (type) {
-          case TO:
-            recipientType = Message.RecipientType.TO;
-            break;
-          case CC:
-            recipientType = Message.RecipientType.CC;
-            break;
-          case BCC:
-          default:
-            recipientType = Message.RecipientType.BCC;
-            break;
-        }
-        String[] addresses = entry.getValue().split(",");
-        for (String address : addresses) {
-          message.addRecipient(recipientType, new InternetAddress(address));
-        }
-      }
-      message.setSubject(subject);
-      LOG.debug("all recipients {}", Arrays.toString(message.getAllRecipients()));
-    }
-    catch (MessagingException ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-
-  public Map<String, String> getRecipients()
-  {
-    return recipients;
-  }
-
-  /**
-   * @param recipients : map from recipient type to coma separated list of addresses for e.g. to->abc@xyz.com,def@xyz.com
-   */
-  public void setRecipients(Map<String, String> recipients)
-  {
-    this.recipients = recipients;
-    resetMessage();
-  }
-
-  @AssertTrue(message = "Please verify the recipients set")
-  private boolean isValid()
-  {
-    if (recipients.isEmpty()) {
-      return false;
-    }
-    for (Map.Entry<String, String> entry : recipients.entrySet()) {
-      if (entry.getKey().toUpperCase().equalsIgnoreCase(RecipientType.TO.toString())) {
-        if (entry.getValue() != null && entry.getValue().length() > 0) {
-          return true;
-        }
-        return false;
-      }
-    }
-    return false;
-  }
-}
diff --git a/library/src/main/java/com/datatorrent/lib/io/smtp/SmtpIdempotentOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/smtp/SmtpIdempotentOutputOperator.java
new file mode 100644
index 0000000..04df10b
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/smtp/SmtpIdempotentOutputOperator.java
@@ -0,0 +1,548 @@
+  /*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.smtp;
+
+import com.datatorrent.api.BaseOperator;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Operator;
+import com.datatorrent.common.util.DTThrowable;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+
+import java.util.*;
+
+import javax.mail.*;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeMessage;
+import javax.validation.constraints.AssertTrue;
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This operator outputs data to an smtp server.
+ * <p>
+ * </p>
+ * @displayName Smtp Output
+ * @category Output
+ * @tags stmp, output operator
+ *
+ * @since 0.3.2
+ */
+public class SmtpIdempotentOutputOperator extends BaseOperator implements Operator.CheckpointListener, Operator.IdleTimeHandler
+{
+  public enum RecipientType
+  {
+    TO, CC, BCC
+  }
+
+  @NotNull
+  private String subject;
+  @NotNull
+  private String content;
+  @NotNull
+  private String from;
+
+  private Map<String, String> recipients = Maps.newHashMap();
+
+  private int smtpPort = 689;
+  private transient long sleepTimeMillis;
+  @NotNull
+  private String smtpHost;
+  private String smtpUserName;
+  private String smtpPassword;
+  private String contentType = "text/plain";
+  private boolean useSsl = false;
+  private boolean setupCalled = false;
+  private transient Queue<String> messagesSent;
+  private transient SMTPSenderThread smtpSenderThread;
+  protected transient Properties properties = System.getProperties();
+  protected transient Authenticator auth;
+  protected transient Session session;
+  protected transient Message message;
+  private final transient AtomicReference<Throwable> throwable;
+  protected IdempotentStorageManager idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+  protected Queue<String> waiting;
+  protected int countMessageSent;
+  private transient int countMessagesToBeSkipped;
+
+  private transient long currentWindowId;
+  private transient int operatorId;
+  private transient long largestRecoveryWindow;
+  private transient int countOfTuples;
+  private transient int numWaiting;
+  private transient final Object sync;
+
+  public SmtpIdempotentOutputOperator()
+  {
+    throwable = new AtomicReference<Throwable>();
+    waiting = new LinkedList<String>();
+    sync = new Object();
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    setupCalled = true;
+    operatorId = context.getId();
+    idempotentStorageManager.setup(context);
+    sleepTimeMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+    smtpSenderThread = new SMTPSenderThread();
+    messagesSent = new LinkedList<String>();
+    reset();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    countOfTuples = 0;
+    largestRecoveryWindow = idempotentStorageManager.getLargestRecoveryWindow();
+    currentWindowId = windowId;
+    if (windowId <= largestRecoveryWindow) {
+      numWaiting = waiting.size();
+      countMessagesToBeSkipped = restore(windowId);
+      if (countMessagesToBeSkipped > numWaiting) {
+        // These messages were already sent in this window, so remove them from waiting.numWaiting could be size of waiting from previous window.
+        for (int i = 0; i < numWaiting; i++) {
+          waiting.poll();
+        }
+        // This diff could be number of tuples coming in this window, check in processTuple.
+        countMessagesToBeSkipped = countMessagesToBeSkipped - numWaiting;
+
+      }
+      else if (countMessagesToBeSkipped <= numWaiting) //It is possible that rest of waiting messages were sent in next window.
+      {
+        for (int i = 0; i < countMessagesToBeSkipped; i++) {
+          waiting.poll();
+        }
+        countMessagesToBeSkipped = 0;
+      }
+    }
+
+  }
+
+  protected void completed(String sentMessage)
+  {
+    messagesSent.add(sentMessage);
+  }
+
+  @Override
+  public void teardown()
+  {
+    idempotentStorageManager.teardown();
+
+    try {
+      smtpSenderThread.stopService();
+    }
+    catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    super.teardown();
+
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    try {
+      idempotentStorageManager.deleteUpTo(operatorId, windowId);
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * This is the port which receives the tuples that will be output to an smtp server.
+   */
+  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+  {
+    @Override
+    public void process(Object t)
+    {
+      String mailContent = content.replace("{}", t.toString());
+      // Recovery case
+      if (currentWindowId <= largestRecoveryWindow) {
+        countOfTuples++;
+        /*There could be more duplicate messages coming in this window, we are not sure of state in waiting queue yet,
+         hence add these to waiting queue. If they are already sent, then they will be removed in next or further
+         windows according to logic implemented in beginwindow.*/
+        if (countOfTuples > countMessagesToBeSkipped) {
+          waiting.add(mailContent);
+        }
+        else {
+          return;
+        }
+      }
+      else {
+        waiting.add(mailContent);
+        smtpSenderThread.messageRcvdQueue.add(mailContent);
+      }
+    }
+
+  };
+
+  @Override
+  public void handleIdleTime()
+  {
+    if (messagesSent.isEmpty() && throwable.get() == null) {
+      try {
+        Thread.sleep(sleepTimeMillis);
+      }
+      catch (InterruptedException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+    else if (throwable.get() != null) {
+      DTThrowable.rethrow(throwable.get());
+    }
+    else {
+      /**
+       * Remove all the messages from waiting which have been sent.
+       */
+      if (!waiting.isEmpty()) {
+        while (!messagesSent.isEmpty()) {
+          waiting.remove(messagesSent.poll());
+        }
+      }
+    }
+  }
+
+  public String getSubject()
+  {
+    return subject;
+  }
+
+  public void setSubject(String subject)
+  {
+    this.subject = subject;
+    resetMessage();
+  }
+
+  public String getContent()
+  {
+    return content;
+  }
+
+  public void setContent(String content)
+  {
+    this.content = content;
+    resetMessage();
+  }
+
+  public String getFrom()
+  {
+    return from;
+  }
+
+  public void setFrom(String from)
+  {
+    this.from = from;
+    resetMessage();
+  }
+
+  public int getSmtpPort()
+  {
+    return smtpPort;
+  }
+
+  public void setSmtpPort(int smtpPort)
+  {
+    this.smtpPort = smtpPort;
+    reset();
+  }
+
+  public String getSmtpHost()
+  {
+    return smtpHost;
+  }
+
+  public void setSmtpHost(String smtpHost)
+  {
+    this.smtpHost = smtpHost;
+    reset();
+  }
+
+  public String getSmtpUserName()
+  {
+    return smtpUserName;
+  }
+
+  public void setSmtpUserName(String smtpUserName)
+  {
+    this.smtpUserName = smtpUserName;
+    reset();
+  }
+
+  public String getSmtpPassword()
+  {
+    return smtpPassword;
+  }
+
+  public void setSmtpPassword(String smtpPassword)
+  {
+    this.smtpPassword = smtpPassword;
+    reset();
+  }
+
+  public String getContentType()
+  {
+    return contentType;
+  }
+
+  public void setContentType(String contentType)
+  {
+    this.contentType = contentType;
+    resetMessage();
+  }
+
+  public boolean isUseSsl()
+  {
+    return useSsl;
+  }
+
+  public void setUseSsl(boolean useSsl)
+  {
+    this.useSsl = useSsl;
+    reset();
+  }
+
+  private void reset()
+  {
+    if (!setupCalled) {
+      return;
+    }
+    if (!StringUtils.isBlank(smtpPassword)) {
+      properties.setProperty("mail.smtp.auth", "true");
+      properties.setProperty("mail.smtp.starttls.enable", "true");
+      if (useSsl) {
+        properties.setProperty("mail.smtp.socketFactory.port", String.valueOf(smtpPort));
+        properties.setProperty("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
+        properties.setProperty("mail.smtp.socketFactory.fallback", "false");
+      }
+
+      auth = new Authenticator()
+      {
+        @Override
+        protected PasswordAuthentication getPasswordAuthentication()
+        {
+          return new PasswordAuthentication(smtpUserName, smtpPassword);
+        }
+
+      };
+    }
+
+    properties.setProperty("mail.smtp.host", smtpHost);
+    properties.setProperty("mail.smtp.port", String.valueOf(smtpPort));
+    session = Session.getInstance(properties, auth);
+    resetMessage();
+  }
+
+  private void resetMessage()
+  {
+    if (!setupCalled) {
+      return;
+    }
+    try {
+      message = new MimeMessage(session);
+      message.setFrom(new InternetAddress(from));
+      for (Map.Entry<String, String> entry: recipients.entrySet()) {
+        RecipientType type = RecipientType.valueOf(entry.getKey().toUpperCase());
+        Message.RecipientType recipientType;
+        switch (type) {
+          case TO:
+            recipientType = Message.RecipientType.TO;
+            break;
+          case CC:
+            recipientType = Message.RecipientType.CC;
+            break;
+          case BCC:
+          default:
+            recipientType = Message.RecipientType.BCC;
+            break;
+        }
+        String[] addresses = entry.getValue().split(",");
+        for (String address: addresses) {
+          message.addRecipient(recipientType, new InternetAddress(address));
+        }
+      }
+      message.setSubject(subject);
+      LOG.debug("all recipients {}", Arrays.toString(message.getAllRecipients()));
+    }
+    catch (MessagingException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  public Map<String, String> getRecipients()
+  {
+    return recipients;
+  }
+
+  /**
+   * @param recipients : map from recipient type to coma separated list of addresses for e.g. to->abc@xyz.com,def@xyz.com
+   */
+  public void setRecipients(Map<String, String> recipients)
+  {
+    this.recipients = recipients;
+    resetMessage();
+  }
+
+  @AssertTrue(message = "Please verify the recipients set")
+  private boolean isValid()
+  {
+    if (recipients.isEmpty()) {
+      return false;
+    }
+    for (Map.Entry<String, String> entry: recipients.entrySet()) {
+      if (entry.getKey().toUpperCase().equalsIgnoreCase(RecipientType.TO.toString())) {
+        if (entry.getValue() != null && entry.getValue().length() > 0) {
+          return true;
+        }
+        return false;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (currentWindowId == largestRecoveryWindow) {
+      if (!waiting.isEmpty()) {
+        smtpSenderThread.messageRcvdQueue.addAll(waiting);
+      }
+    }
+    else if (currentWindowId > largestRecoveryWindow) {
+      try {
+        synchronized (sync) {
+          idempotentStorageManager.save(countMessageSent, operatorId, currentWindowId);
+          LOG.debug("idempotent manager saves count {}", countMessageSent);
+          countMessageSent = 0;
+        }
+      }
+      catch (IOException e) {
+        throw new RuntimeException("saving recovery", e);
+      }
+
+    }
+    if (!waiting.isEmpty()) {
+      while (!messagesSent.isEmpty()) {
+        waiting.remove(messagesSent.poll());
+      }
+    }
+  }
+
+  protected int restore(long windowId)
+  {
+    try {
+      Map<Integer, Object> recoveryDataPerOperator = idempotentStorageManager.load(windowId);
+      for (Object recovery: recoveryDataPerOperator.values()) {
+        countMessagesToBeSkipped = (Integer)recovery;
+      }
+    }
+    catch (IOException ex) {
+      throw new RuntimeException("replay", ex);
+    }
+    return countMessagesToBeSkipped;
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  /**
+   * Sets the idempotent storage manager on the operator.
+   *
+   * @param idempotentStorageManager an {@link IdempotentStorageManager}
+   */
+  public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager)
+  {
+    this.idempotentStorageManager = idempotentStorageManager;
+  }
+
+  /**
+   * Returns the idempotent storage manager which is being used by the operator.
+   *
+   * @return the idempotent storage manager.
+   */
+  public IdempotentStorageManager getIdempotentStorageManager()
+  {
+    return idempotentStorageManager;
+  }
+
+  private class SMTPSenderThread implements Runnable
+  {
+    private transient final BlockingQueue<String> messageRcvdQueue;
+    private transient volatile boolean running;
+
+    SMTPSenderThread()
+    {
+      messageRcvdQueue = new LinkedBlockingQueue<String>();
+      Thread messageServiceThread = new Thread(this, "SMTPSenderThread");
+      messageServiceThread.start();
+    }
+
+    private void stopService() throws IOException
+    {
+      running = false;
+    }
+
+    @Override
+    public void run()
+    {
+      running = true;
+      while (running) {
+        String mailContent = messageRcvdQueue.poll();
+        if (mailContent != null) {
+          try {
+            message.setContent(mailContent, contentType);
+            synchronized (sync) {
+              Transport.send(message);
+            }
+          }
+          catch (MessagingException ex) {
+            running = false;
+            throwable.set(ex);
+          }
+          synchronized (sync) {
+            countMessageSent++;
+          }
+
+          LOG.debug("message is {}", message);
+          completed(mailContent);
+
+        }
+      }
+
+    }
+
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(SmtpIdempotentOutputOperator.class);
+
+}
diff --git a/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java
deleted file mode 100644
index dd8873c..0000000
--- a/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.lib.io;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.mail.Message;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeMessage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
-import com.google.common.collect.Maps;
-import com.icegreen.greenmail.util.GreenMail;
-import com.icegreen.greenmail.util.ServerSetup;
-import com.icegreen.greenmail.util.ServerSetupTest;
-
-public class SmtpOutputOperatorTest
-{
-
-  String subject = "ALERT!";
-  String content = "This is an SMTP operator test {}";
-  String from = "jenkins@datatorrent.com";
-  String to = "jenkins@datatorrent.com";
-  String cc = "jenkinsCC@datatorrent.com";
-  GreenMail greenMail = null;
-  SmtpOutputOperator node;
-  Map<String, String> data;
-
-  @Before
-  public void setup() throws Exception
-  {
-    node = new SmtpOutputOperator();
-    greenMail = new GreenMail(ServerSetupTest.ALL);
-    greenMail.start();
-    node.setFrom(from);
-    node.setContent(content);
-    node.setSmtpHost("127.0.0.1");
-    node.setSmtpPort(ServerSetupTest.getPortOffset() + ServerSetup.SMTP.getPort());
-    node.setSmtpUserName(from);
-    node.setSmtpPassword("<password>");
-    //node.setUseSsl(true);
-    node.setSubject(subject);
-    data = new HashMap<String, String>();
-    data.put("alertkey", "alertvalue");
-
-  }
-
-  @After
-  public void teardown() throws Exception
-  {
-    node.teardown();
-    greenMail.stop();
-    Thread.sleep(1000);
-  }
-
-  @Test
-  public void testSmtpOutputNode() throws Exception
-  {
-    Map<String, String> recipients = Maps.newHashMap();
-    recipients.put("to", to + "," + cc);
-    recipients.put("cc", cc);
-    node.setRecipients(recipients);
-    node.setup(null);
-    node.beginWindow(1000);
-    node.input.process(data);
-    node.endWindow();
-    Assert.assertTrue(greenMail.waitForIncomingEmail(5000, 1));
-    MimeMessage[] messages = greenMail.getReceivedMessages();
-    Assert.assertEquals(3, messages.length);
-    String receivedContent = messages[0].getContent().toString().trim();
-    String expectedContent = content.replace("{}", data.toString()).trim();
-
-    Assert.assertTrue(expectedContent.equals(receivedContent));
-    Assert.assertEquals(from, ((InternetAddress) messages[0].getFrom()[0]).getAddress());
-    Assert.assertEquals(to, messages[0].getRecipients(Message.RecipientType.TO)[0].toString());
-    Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.TO)[1].toString());
-    Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.CC)[0].toString());
-
-  }
-
-  @Test
-  public void test() throws Exception
-  {
-    Map<String, String> recipients = Maps.newHashMap();
-    recipients.put("to", to);
-    node.setRecipients(recipients);
-    node.setup(null);
-    node.beginWindow(1000);
-    node.input.process(data);
-    node.endWindow();
-    Assert.assertTrue(greenMail.waitForIncomingEmail(5000, 1));
-    MimeMessage[] messages = greenMail.getReceivedMessages();
-    Assert.assertEquals(1, messages.length);
-    String receivedContent = messages[0].getContent().toString().trim();
-    String expectedContent = content.replace("{}", data.toString()).trim();
-
-    Assert.assertTrue(expectedContent.equals(receivedContent));
-    Assert.assertEquals(from, ((InternetAddress) messages[0].getFrom()[0]).getAddress());
-    Assert.assertEquals(to, messages[0].getAllRecipients()[0].toString());
-  }
-
-  @Test
-  public void testProperties() throws Exception
-  {
-    Configuration conf = new Configuration(false);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.subject", subject);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.content", content);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.from", from);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.smtpHost", "127.0.0.1");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.smtpUserName", from);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.smtpPassword", "<password>");
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.recipients.TO", to + "," + cc);
-    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.recipients.CC", cc);
-
-    final AtomicReference<SmtpOutputOperator> o1 = new AtomicReference<SmtpOutputOperator>();
-    StreamingApplication app = new StreamingApplication() {
-      @Override
-      public void populateDAG(DAG dag, Configuration conf)
-      {
-        o1.set(dag.addOperator("o1", new SmtpOutputOperator()));
-      }
-    };
-
-    LocalMode lma = LocalMode.newInstance();
-    lma.prepareDAG(app, conf);
-    Assert.assertEquals("checking TO list", to + "," + cc, o1.get().getRecipients().get("TO"));
-    Assert.assertEquals("checking CC list", cc, o1.get().getRecipients().get("CC"));
-
-  }
-}
diff --git a/library/src/test/java/com/datatorrent/lib/io/smtp/SmtpOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/smtp/SmtpOutputOperatorTest.java
new file mode 100644
index 0000000..f00fce2
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/smtp/SmtpOutputOperatorTest.java
@@ -0,0 +1,295 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.smtp;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.mail.Message;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeMessage;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.collect.Maps;
+import com.icegreen.greenmail.util.GreenMail;
+import com.icegreen.greenmail.util.ServerSetup;
+import com.icegreen.greenmail.util.ServerSetupTest;
+
+import org.junit.*;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.util.TestUtils;
+
+import com.datatorrent.api.*;
+import java.util.logging.Level;
+
+public class SmtpOutputOperatorTest
+{
+
+  String subject = "ALERT!";
+  String content = "This is an SMTP operator test {}";
+  String from = "jenkins@datatorrent.com";
+  String to = "jenkins@datatorrent.com";
+  String cc = "jenkinsCC@datatorrent.com";
+  GreenMail greenMail = null;
+  SmtpIdempotentOutputOperator node;
+
+  Map<String, String> data;
+
+  public class TestMeta extends TestWatcher
+  {
+    public String dir = null;
+    Context.OperatorContext context;
+    IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager();
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      String methodName = description.getMethodName();
+      String className = description.getClassName();
+      this.dir = "target/" + className + "/" + methodName;
+      Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.DAGContext.APPLICATION_ID, "FileInputOperatorTest");
+      context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+      node = new SmtpIdempotentOutputOperator();
+      greenMail = new GreenMail(ServerSetupTest.ALL);
+      greenMail.start();
+      node.setFrom(from);
+      node.setContent(content);
+      node.setSmtpHost("127.0.0.1");
+      node.setSmtpPort(ServerSetupTest.getPortOffset() + ServerSetup.SMTP.getPort());
+      node.setSmtpUserName(from);
+      node.setSmtpPassword("<password>");
+      data = new HashMap<String, String>();
+      data.put("alertkey", "alertvalue");
+
+    manager.setRecoveryPath(testMeta.dir + "/recovery");
+    manager.setup(testMeta.context);
+    node.setIdempotentStorageManager(manager);
+
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      super.finished(description);
+      FileUtils.deleteQuietly(new File(this.dir));
+      greenMail.stop();
+      try {
+        Thread.sleep(1000);
+      }
+      catch (InterruptedException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+
+  @Test
+  public void testSmtpOutputNode() throws Exception
+  {
+    Map<String, String> recipients = Maps.newHashMap();
+    recipients.put("to", to + "," + cc);
+    recipients.put("cc", cc);
+    node.setRecipients(recipients);
+    node.setSubject("hello");
+    node.setup(testMeta.context);
+    node.beginWindow(0);
+    String data = "First test message";
+    node.input.process(data);
+    Thread.sleep(5000);
+    node.endWindow();
+    node.teardown();
+    Thread.sleep(5000);
+    Assert.assertTrue(greenMail.waitForIncomingEmail(5000, 1));
+    MimeMessage[] messages = greenMail.getReceivedMessages();
+    Assert.assertEquals(3, messages.length);
+    String receivedContent = messages[0].getContent().toString().trim();
+    String expectedContent = content.replace("{}", data.toString()).trim();
+
+    Assert.assertTrue(expectedContent.equals(receivedContent));
+    Assert.assertEquals(from, ((InternetAddress)messages[0].getFrom()[0]).getAddress());
+    Assert.assertEquals(to, messages[0].getRecipients(Message.RecipientType.TO)[0].toString());
+    Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.TO)[1].toString());
+    Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.CC)[0].toString());
+  }
+
+  @Test
+  public void test() throws Exception
+  {
+    Map<String, String> recipients = Maps.newHashMap();
+    recipients.put("to", to);
+    node.setRecipients(recipients);
+    node.setup(testMeta.context);
+    node.beginWindow(0);
+    String data = "First test message";
+    node.input.process(data);
+    node.endWindow();
+    node.teardown();
+    Thread.sleep(500);
+    Assert.assertTrue(greenMail.waitForIncomingEmail(5000, 1));
+    MimeMessage[] messages = greenMail.getReceivedMessages();
+    Assert.assertEquals(1, messages.length);
+    String receivedContent = messages[0].getContent().toString().trim();
+    String expectedContent = content.replace("{}", data.toString()).trim();
+
+    Assert.assertTrue(expectedContent.equals(receivedContent));
+    Assert.assertEquals(from, ((InternetAddress)messages[0].getFrom()[0]).getAddress());
+    Assert.assertEquals(to, messages[0].getAllRecipients()[0].toString());
+  }
+
+  @Test
+  public void testProperties() throws Exception
+  {
+    Configuration conf = new Configuration(false);
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.subject", subject);
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.content", content);
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.from", from);
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.smtpHost", "127.0.0.1");
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.smtpUserName", from);
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.smtpPassword", "<password>");
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.recipients.TO", to + "," + cc);
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.recipients.CC", cc);
+
+    final AtomicReference<SmtpIdempotentOutputOperator> o1 = new AtomicReference<SmtpIdempotentOutputOperator>();
+    StreamingApplication app = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+        o1.set(dag.addOperator("o1", new SmtpIdempotentOutputOperator()));
+      }
+
+    };
+
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(app, conf);
+    Assert.assertEquals("checking TO list", to + "," + cc, o1.get().getRecipients().get("TO"));
+    Assert.assertEquals("checking CC list", cc, o1.get().getRecipients().get("CC"));
+  }
+
+  @Test
+  public void testIdempotency() throws Exception
+  {
+    Map<String, String> recipients = Maps.newHashMap();
+
+    recipients.put("to", to);
+    node.setup(testMeta.context);
+    node.setRecipients(recipients);
+    node.setSubject("hello");
+    for (long wid = 1; wid < 10; wid++) {
+      node.beginWindow(wid);
+      String input = wid + "test message";
+      node.input.process(input);
+      if(wid == 5 || wid == 6 || wid == 7){
+      input = wid +"hello"+"test message";
+       node.input.process(input);
+      }
+      node.endWindow();
+      Thread.sleep(10000);
+      if (wid == 7) {
+        SmtpIdempotentOutputOperator newOp = TestUtils.clone(new Kryo(), node);
+        node.teardown();
+        newOp.setup(testMeta.context);
+        newOp.beginWindow(5);
+        String inputTest = 5 + "test message";
+        newOp.input.process(inputTest);
+        inputTest = 5 +"hello"+"test message";
+        newOp.endWindow();
+        Thread.sleep(5000);
+
+        newOp.beginWindow(6);
+        inputTest = 6 + "test message";
+        newOp.input.process(inputTest);
+        inputTest = 6 +"hello"+"test message";
+        newOp.input.process(inputTest);
+        newOp.endWindow();
+        Thread.sleep(5000);
+
+        newOp.beginWindow(7);
+        inputTest = 7 + "test message";
+        newOp.input.process(inputTest);
+        inputTest = 7 +"hello"+"test message";
+        newOp.input.process(inputTest);
+        newOp.endWindow();
+        Thread.sleep(5000);
+        //normal processing
+        newOp.beginWindow(8);
+        inputTest = 8 + "test message";
+        newOp.input.process(inputTest);
+        newOp.endWindow();
+        Thread.sleep(10000);
+        newOp.teardown();
+        break;
+      }
+    }
+    Assert.assertTrue(greenMail.waitForIncomingEmail(5000, 1));
+
+    MimeMessage[] messages = greenMail.getReceivedMessages();
+    Assert.assertEquals(11, messages.length);
+    for (int i = 0; i < 5; i++) {
+      String receivedContent = messages[i].getContent().toString().trim();
+      logger.debug("received content is {}", receivedContent);
+      String expectedContent = content.replace("{}", i+1 + "test message").trim();
+      Assert.assertTrue(expectedContent.equals(receivedContent));
+    }
+
+    String receivedContent = messages[5].getContent().toString().trim();
+    logger.debug("received content is {}", receivedContent);
+    String expectedContent = content.replace("{}", 5 + "hellotest message").trim();
+    Assert.assertTrue(expectedContent.equals(receivedContent));
+    receivedContent = messages[6].getContent().toString().trim();
+    logger.debug("received content is {}", receivedContent);
+    expectedContent = content.replace("{}", 6 + "test message").trim();
+    Assert.assertTrue(expectedContent.equals(receivedContent));
+    receivedContent = messages[7].getContent().toString().trim();
+    logger.debug("received content is {}", receivedContent);
+    expectedContent = content.replace("{}", 6 + "hellotest message").trim();
+    Assert.assertTrue(expectedContent.equals(receivedContent));
+    receivedContent = messages[8].getContent().toString().trim();
+    logger.debug("received content is {}", receivedContent);
+    expectedContent = content.replace("{}", 7 + "test message").trim();
+    Assert.assertTrue(expectedContent.equals(receivedContent));
+    receivedContent = messages[9].getContent().toString().trim();
+    logger.debug("received content is {}", receivedContent);
+    expectedContent = content.replace("{}", 7 + "hellotest message").trim();
+    Assert.assertTrue(expectedContent.equals(receivedContent));
+    receivedContent = messages[10].getContent().toString().trim();
+    logger.debug("received content is {}", receivedContent);
+    expectedContent = content.replace("{}", 8 + "test message").trim();
+    Assert.assertTrue(expectedContent.equals(receivedContent));
+
+    Assert.assertEquals(from, ((InternetAddress)messages[0].getFrom()[0]).getAddress());
+    Assert.assertEquals(to, messages[0].getRecipients(Message.RecipientType.TO)[0].toString());
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(SmtpOutputOperatorTest.class);
+}
\ No newline at end of file