Merge branch 'master' into release-1.0.2

Conflicts:
	contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
	library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
index a3750c0..4989709 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
@@ -58,7 +58,7 @@
 	 */
 	@Nonnull
 	protected abstract PreparedStatement getUpdateCommand();
-	 
+
 	@Override
 	public void setup(Context.OperatorContext context)
 	{
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractExactlyOnceKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractExactlyOnceKafkaOutputOperator.java
new file mode 100644
index 0000000..dacfef8
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractExactlyOnceKafkaOutputOperator.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.kafka;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Partitioner;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.Pair;
+import com.datatorrent.contrib.kafka.AbstractKafkaOutputOperator;
+import com.google.common.collect.Sets;
+
+
+/**
+ * Kafka output operator, which, in most cases, guarantees to send tuples to kafka MQ only once.<br>
+ * Assuming messages kept in kafka are ordered by either key or value or keyvalue pair
+ * (For example, use timestamps as key), this Kafka OutputOperator always retrieve the last message from MQ as initial offset.
+ *  So that replayed message wouldn't be sent to kafka again.
+ *
+ * This is not "perfect exact once" in 2 cases:
+ * 1 Multiple producers produce messages to same kafka partition
+ * 2 You have same message sent out and before kafka synchronized this message among all the brokers, the operator is
+ * started again.
+ *
+ * <br>
+ * Ports:<br>
+ * <b>Input</b>: One input port<br>
+ * <b>Output</b>: No output port<br>
+ * <br>
+ * Properties:<br>
+ * configProperties<br>
+ * <br>
+ * Compile time checks:<br>
+ * Class derived from has to implement 2 methods:<br>
+ * tupleToKeyValue() to convert input tuples to kafka key value objects<br>
+ * compareToLastMsg() to compare incoming tuple with the last received msg in kafka so that the operator could skip the received ones<br>
+ * <br>
+ * Run time checks:<br>
+ * None<br>
+ * <br>
+ * Benchmarks:<br>
+ * TBD<br>
+ * <br>
+ *
+ * @since 1.0.2
+ */
+
+public abstract class AbstractExactlyOnceKafkaOutputOperator<T, K, V> extends AbstractKafkaOutputOperator<K, V>
+{
+
+  private Map<Integer, Pair<byte[], byte[]>>  lastMsgs;
+
+  private transient  Partitioner partitioner;
+
+  private transient int partitionNum = 1;
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    try {
+      String className = (String) getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_PARTITIONER);
+      if (className != null) {
+        partitioner = (Partitioner) Class.forName(className).newInstance();
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize partitioner", e);
+    }
+    //read last message from kafka
+    initializeLastProcessingOffset();
+  }
+
+  public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() {
+    @Override
+    public void process(T tuple)
+    {
+      Pair<K, V> keyValue = tupleToKeyValue(tuple);
+      int pid = 0;
+
+      if(partitioner!=null){
+        pid = partitioner.partition(keyValue.first, partitionNum);
+      }
+
+      Pair<byte[], byte[]> lastMsg = lastMsgs.get(pid);
+
+      if(lastMsg == null || compareToLastMsg(keyValue, lastMsg) > 0){
+        getProducer().send(new KeyedMessage<K, V>(getTopic(), keyValue.first, keyValue.second));
+        System.out.println("Send out the message " + keyValue.second);
+        sendCount ++;
+      } else {
+        // ignore tuple because kafka has already had the tuple
+        logger.debug("Ingore tuple " + tuple);
+        return;
+      }
+    }
+  };
+
+  private void initializeLastProcessingOffset()
+  {
+
+    // read last received kafka message
+    TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST)), this.getTopic());
+
+    if (tm == null) {
+      throw new RuntimeException("Failed to retrieve topic metadata");
+    }
+
+    partitionNum = tm.partitionsMetadata().size();
+
+    lastMsgs = new HashMap<Integer, Pair<byte[],byte[]>>(partitionNum);
+
+    for (PartitionMetadata pm : tm.partitionsMetadata()) {
+
+      String leadBroker = pm.leader().host();
+      int port = pm.leader().port();
+      String clientName = this.getClass().getName().replace('$', '.') + "_Client_" + tm.topic() + "_" + pm.partitionId();
+      SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
+
+      long readOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(), kafka.api.OffsetRequest.LatestTime(), clientName);
+
+      FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(tm.topic(), pm.partitionId(), readOffset - 1, 100000).build();
+
+      FetchResponse fetchResponse = consumer.fetch(req);
+      for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(tm.topic(), pm.partitionId())) {
+
+        Message m = messageAndOffset.message();
+
+        ByteBuffer payload = m.payload();
+        ByteBuffer key = m.key();
+        byte[] valueBytes = new byte[payload.limit()];
+        byte[] keyBytes = new byte[key.limit()];
+        payload.get(valueBytes);
+        key.get(keyBytes);
+        lastMsgs.put(pm.partitionId(), new Pair<byte[], byte[]>(keyBytes, valueBytes));
+      }
+
+    }
+
+  }
+
+  /**
+   * compare the incoming tuple with the last received message in kafka.
+   *
+   * @param tupleKeyValue
+   * @param lastReceivedKeyValue
+   * @return <=0 if tupleKeyValue is supposed to be before lastReceivedKeyValue
+   *          >0 if tupleKeyValue is after the lastReceivedKeyValue
+   */
+  protected abstract int compareToLastMsg(Pair<K, V> tupleKeyValue, Pair<byte[], byte[]> lastReceivedKeyValue);
+
+  /**
+   * Tell the operator how to convert a input tuple to a kafka key value pair
+   * @param tuple
+   * @return
+   */
+  protected abstract Pair<K, V> tupleToKeyValue(T tuple);
+
+  private static final Logger logger = LoggerFactory.getLogger(AbstractExactlyOnceKafkaOutputOperator.class);
+
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
index decbfa4..846b0ef 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
@@ -41,6 +41,10 @@
  */
 public class KafkaMetadataUtil
 {
+  
+  public static final String PRODUCER_PROP_PARTITIONER = "partitioner.class";
+  
+  public static final String PRODUCER_PROP_BROKERLIST = "metadata.broker.list";
 
   private static Logger logger = LoggerFactory.getLogger(KafkaMetadataUtil.class);
 
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java
new file mode 100644
index 0000000..81e9fdd
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.kafka;
+import com.datatorrent.api.ActivationListener;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.Pair;
+
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase
+{
+  private static final Logger logger = LoggerFactory.getLogger(KafkaExactlyOnceOutputOperatorTest.class);
+  private static final int maxTuple = 40;
+  private static CountDownLatch latch;
+  private static boolean isRestarted = false;
+  
+   /**
+   * Tuple generator for testing.
+   */
+  public static class StringGeneratorInputOperator implements InputOperator, ActivationListener<OperatorContext>
+  {
+    public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<String>();
+    private final transient ArrayBlockingQueue<String> stringBuffer = new ArrayBlockingQueue<String>(1024);
+    private volatile Thread dataGeneratorThread;
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+    }
+
+    @Override
+    public void endWindow()
+    {
+    }
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+      
+    }
+
+    @Override
+    public void teardown()
+    {
+    }
+
+    @Override
+    public void activate(OperatorContext ctx)
+    {
+      dataGeneratorThread = new Thread("String Generator")
+      {
+        @Override
+        public void run()
+        {
+          try {
+            int i = 0;
+            while (dataGeneratorThread != null && i < maxTuple) {
+              stringBuffer.put((++i) + "###testString " + i);
+            }
+            stringBuffer.put((maxTuple + 1) + "###" + KafkaOperatorTestBase.END_TUPLE);
+          }
+          catch (InterruptedException ie) {
+          }
+        }
+      };
+      dataGeneratorThread.start();
+    }
+
+    @Override
+    public void deactivate()
+    {
+      dataGeneratorThread = null;
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      for (int i = stringBuffer.size(); i-- > 0;) {
+        if (i == 20 && isRestarted == false) {
+          isRestarted = true;
+          // fail the operator and when it gets back resend everything 
+          throw new RuntimeException();
+        }
+        outputPort.emit(stringBuffer.poll());
+      }
+    }
+  } // End of StringGeneratorInputOperator
+
+  /**
+   * Test AbstractKafkaExactOnceOutputOperator (i.e. an output adapter for Kafka, aka producer).
+   * This module sends data into a Kafka message bus.
+   *
+   * [Generate tuple] ==> [send tuple through Kafka output adapter(i.e. producer) into Kafka message bus](fail the producer at certain point and bring it back)
+   * ==> [receive data in outside Kafka listener (i.e consumer)] ==> Verify kafka doesn't receive duplicated message
+   *
+   * @throws Exception
+   */
+  @Test
+  @SuppressWarnings({"rawtypes"})
+  public void testKafkaExactOnceOutputOperator() throws Exception
+  {
+    //initialize the latch to synchronize the threads
+    latch = new CountDownLatch(maxTuple);
+    // Setup a message listener to receive the message
+    KafkaTestConsumer listener = new KafkaTestConsumer("topic1");
+    listener.setLatch(latch);
+
+    // Malhar module to send message
+    // Create DAG for testing.
+    LocalMode lma = LocalMode.newInstance();
+    final DAG dag = lma.getDAG();
+
+    StringGeneratorInputOperator generator = dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class);
+    final SimpleKafkaExactOnceOutputOperator node = dag.addOperator("Kafka message producer", SimpleKafkaExactOnceOutputOperator.class);
+    
+    Properties props = new Properties();
+    props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
+    props.put("metadata.broker.list", "localhost:9092");
+    props.setProperty("producer.type", "async");
+    props.setProperty("queue.buffering.max.ms", "200");
+    props.setProperty("queue.buffering.max.messages", "10");
+    props.setProperty("batch.num.messages", "5");
+    
+    node.setConfigProperties(props);
+    // Set configuration parameters for Kafka
+    node.setTopic("topic1");
+
+    // Connect ports
+    dag.addStream("Kafka message", generator.outputPort, node.inputPort).setLocality(Locality.CONTAINER_LOCAL);
+
+    
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+
+    Future f = Executors.newFixedThreadPool(1).submit(listener);
+    f.get(30, TimeUnit.SECONDS);
+    
+    lc.shutdown();
+
+    // Check values send vs received
+    Assert.assertEquals("Number of emitted tuples", maxTuple, listener.holdingBuffer.size());
+    logger.debug(String.format("Number of emitted tuples: %d", listener.holdingBuffer.size()));
+    Assert.assertEquals("First tuple", "testString 1", listener.getMessage(listener.holdingBuffer.peek()));
+
+    listener.close();
+    
+  }
+  
+  public static class SimpleKafkaExactOnceOutputOperator extends AbstractExactlyOnceKafkaOutputOperator<String, String, String>{
+
+    @Override
+    protected int compareToLastMsg(Pair<String, String> tupleKeyValue, Pair<byte[], byte[]> lastReceivedKeyValue)
+    {
+      return Integer.parseInt(tupleKeyValue.first) - Integer.parseInt(new String(lastReceivedKeyValue.first));
+    }
+
+    @Override
+    protected Pair<String, String> tupleToKeyValue(String tuple)
+    {
+      return new Pair<String, String>(tuple.split("###")[0], tuple.split("###")[1]);
+    }
+    
+  }
+
+
+}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
index b78314a..f1b5013 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
@@ -21,11 +21,16 @@
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
 import java.util.Properties;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+
 import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +44,7 @@
   private static int tupleCount = 0;
   private static final int maxTuple = 20;
   private static CountDownLatch latch;
-  
+
    /**
    * Tuple generator for testing.
    */
@@ -128,30 +133,41 @@
     listener.setLatch(latch);
     new Thread(listener).start();
 
-    // Malhar module to send message
     // Create DAG for testing.
     LocalMode lma = LocalMode.newInstance();
+
+    StreamingApplication app = new StreamingApplication() {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+      }
+    };
+
     DAG dag = lma.getDAG();
 
     // Create ActiveMQStringSinglePortOutputOperator
     StringGeneratorInputOperator generator = dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class);
-    KafkaSinglePortOutputOperator node = dag.addOperator("Kafka message producer", KafkaSinglePortOutputOperator.class);
-    
+    KafkaSinglePortOutputOperator node = dag.addOperator("KafkaMessageProducer", KafkaSinglePortOutputOperator.class);
+
     Properties props = new Properties();
     props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
-    props.put("metadata.broker.list", "localhost:9092");
+    props.put("metadata.broker.list", "invalidhost:9092");
     props.setProperty("producer.type", "async");
     props.setProperty("queue.buffering.max.ms", "200");
     props.setProperty("queue.buffering.max.messages", "10");
     props.setProperty("batch.num.messages", "5");
-    
+
     node.setConfigProperties(props);
-    // Set configuration parameters for Kafka
     node.setTopic("topic1");
 
     // Connect ports
     dag.addStream("Kafka message", generator.outputPort, node.inputPort).setLocality(Locality.CONTAINER_LOCAL);
 
+    // MLHR-1143: verify we can set broker list (and other properties) through configuration
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.KafkaMessageProducer.prop.configProperties(metadata.broker.list)", "localhost:9092");
+    lma.prepareDAG(app, conf);
+
     // Create local cluster
     final LocalMode.Controller lc = lma.getController();
     lc.runAsync();
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestConsumer.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestConsumer.java
index a2fa664..a276c14 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestConsumer.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestConsumer.java
@@ -108,7 +108,7 @@
         latch.countDown();
       }
       if(getMessage(msg).equals(KafkaOperatorTestBase.END_TUPLE)){
-        return;
+        break;
       }
       holdingBuffer.add(msg);
       receiveCount++;
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
index cc3b593..0d64922 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
@@ -56,6 +56,9 @@
  * <p/>
  * Supports partitioning and dynamic changes to number of partitions through property {@link #partitionCount}. The
  * directory scanner is responsible to only accept the files that belong to a partition.
+ * <p/>
+ * This class supports retrying of failed files by putting them into failed list, and retrying them after pending
+ * files are processed. Retrying is disabled when maxRetryCount is set to zero.
  *
  * @since 1.0.2
  */
@@ -74,7 +77,52 @@
   private int emitBatchSize = 1000;
   private int currentPartitions = 1 ;
   private int partitionCount = 1;
+  private int retryCount = 0;
+  private int maxRetryCount = 5;
+  transient protected int skipCount = 0;
 
+  /**
+   * Class representing failed file, When read fails on a file in middle, then the file is
+   * added to failedList along with last read offset.
+   * The files from failedList will be processed after all pendingFiles are processed, but
+   * before checking for new files.
+   * failed file is retried for maxRetryCount number of times, after that the file is
+   * ignored.
+   */
+  protected static class FailedFile {
+    String path;
+    int   offset;
+    int    retryCount;
+    long   lastFailedTime;
+
+    /* For kryo serialization */
+    protected FailedFile() {}
+
+    protected FailedFile(String path, int offset) {
+      this.path = path;
+      this.offset = offset;
+      this.retryCount = 0;
+    }
+
+    protected FailedFile(String path, int offset, int retryCount) {
+      this.path = path;
+      this.offset = offset;
+      this.retryCount = retryCount;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "FailedFile[" +
+          "path='" + path + '\'' +
+          ", offset=" + offset +
+          ", retryCount=" + retryCount +
+          ", lastFailedTime=" + lastFailedTime +
+          ']';
+    }
+  }
+  /* List of failed file */
+  private final Queue<FailedFile> failedFiles = new LinkedList<FailedFile>();
 
   protected transient FileSystem fs;
   protected transient Configuration configuration;
@@ -146,7 +194,8 @@
       configuration = new Configuration();
       fs = FileSystem.newInstance(filePath.toUri(), configuration);
       if (currentFile != null && offset > 0) {
-        LOG.info("Continue reading {} from index {}", currentFile, offset);
+        long startTime = System.currentTimeMillis();
+        LOG.info("Continue reading {} from index {} time={}", currentFile, offset, startTime);
         int index = offset;
         this.inputStream = openFile(new Path(currentFile));
         // fast forward to previous offset
@@ -154,6 +203,7 @@
           readEntity();
           offset++;
         }
+        LOG.info("Read offset={} records in setup time={}", offset, System.currentTimeMillis() - startTime);
       }
     }
     catch (IOException ex) {
@@ -188,21 +238,25 @@
   final public void emitTuples()
   {
     if (inputStream == null) {
-      if (pendingFiles.isEmpty()) {
+      if (pendingFiles.isEmpty() && failedFiles.isEmpty()) {
         if (System.currentTimeMillis() - scanIntervalMillis > lastScanMillis) {
           pendingFiles = scanner.scan(fs, filePath, processedFiles);
           lastScanMillis = System.currentTimeMillis();
         }
       }
-      if (!pendingFiles.isEmpty())
-      {
-        Path path = pendingFiles.iterator().next();
-        pendingFiles.remove(path);
-        try {
+
+      try {
+        if (!pendingFiles.isEmpty()) {
+          Path path = pendingFiles.iterator().next();
+          pendingFiles.remove(path);
           this.inputStream = openFile(path);
-        } catch (Exception ex) {
-          throw new RuntimeException(ex);
         }
+        else if (!failedFiles.isEmpty()) {
+          FailedFile ff = failedFiles.poll();
+          this.inputStream = retryFailedFile(ff);
+        }
+      }catch (IOException ex) {
+        throw new RuntimeException(ex);
       }
     }
 
@@ -216,8 +270,17 @@
             closeFile(inputStream);
             break;
           }
-          offset++;
-          emit(line);
+
+          /* If skipCount is non zero, then failed file recovery is going on, skipCount is
+           * used to prevent already emitted records from being emitted again during recovery.
+           * When failed file is open, skipCount is set to the last read offset for that file.
+           */
+          if (skipCount == 0) {
+            offset++;
+            emit(line);
+          }
+          else
+            skipCount--;
         }
       } catch (IOException e) {
         LOG.error("FS reader error", e);
@@ -226,20 +289,61 @@
     }
   }
 
+  protected void addToFailedList() throws IOException {
+
+    FailedFile ff = new FailedFile(currentFile, offset, retryCount);
+
+    // try to close file
+    if (this.inputStream != null)
+      this.inputStream.close();
+
+    ff.retryCount ++;
+    ff.lastFailedTime = System.currentTimeMillis();
+    ff.offset = this.offset;
+
+    // Clear current file state.
+    this.currentFile = null;
+    this.inputStream = null;
+    this.offset = 0;
+
+    if (ff.retryCount > maxRetryCount)
+      return;
+
+    LOG.info("adding to failed list path {} offset {} retry {}", ff.path, ff.offset, ff.retryCount);
+    failedFiles.add(ff);
+  }
+
+  protected InputStream retryFailedFile(FailedFile ff)  throws IOException
+  {
+    LOG.info("retrying failed file {} offset {} retry {}", ff.path, ff.offset, ff.retryCount);
+    String path = ff.path;
+    this.inputStream = openFile(new Path(path));
+    this.offset = ff.offset;
+    this.retryCount = ff.retryCount;
+    this.skipCount = ff.offset;
+    return this.inputStream;
+  }
+
   protected InputStream openFile(Path path) throws IOException
   {
     LOG.info("opening file {}", path);
     InputStream input = fs.open(path);
     currentFile = path.toString();
     offset = 0;
+    retryCount = 0;
+    skipCount = 0;
     return input;
   }
 
   protected void closeFile(InputStream is) throws IOException
   {
-    is.close();
-    is = null;
-    processedFiles.add(currentFile);
+    LOG.info("closing file {} offset {}", currentFile, offset);
+
+    if (is != null)
+      is.close();
+    if (currentFile != null)
+      processedFiles.add(currentFile);
+
     currentFile = null;
     inputStream = null;
   }
@@ -267,9 +371,11 @@
     Set<String> totalProcessedFiles = new HashSet<String>();
     List<Pair<String, Integer>> currentFiles = new ArrayList<Pair<String, Integer>>();
     List<DirectoryScanner> oldscanners = new LinkedList<DirectoryScanner>();
+    List<FailedFile> totalFailedFiles = new LinkedList<FailedFile>();
     for(Partition<AbstractFSDirectoryInputOperator<T>> partition : partitions) {
       AbstractFSDirectoryInputOperator<T> oper = partition.getPartitionedInstance();
       totalProcessedFiles.addAll(oper.processedFiles);
+      totalFailedFiles.addAll(oper.failedFiles);
       if (oper.currentFile != null)
         currentFiles.add(new Pair<String, Integer>(oper.currentFile, oper.offset));
       oldscanners.add(oper.getScanner());
@@ -288,7 +394,7 @@
       DirectoryScanner scn = scanners.get(i);
       oper.setScanner(scn);
 
-      // Do state transfer
+      // Do state transfer for processed files.
       oper.processedFiles.addAll(totalProcessedFiles);
 
       /* set current scanning directory and offset */
@@ -302,6 +408,17 @@
         }
       }
 
+      /* transfer failed files */
+      oper.failedFiles.clear();
+      Iterator<FailedFile> iter = totalFailedFiles.iterator();
+      while (iter.hasNext()) {
+        FailedFile ff = iter.next();
+        if (scn.acceptFile(ff.path)) {
+          oper.failedFiles.add(ff);
+          iter.remove();
+        }
+      }
+
       newPartitions.add(new DefaultPartition<AbstractFSDirectoryInputOperator<T>>(oper));
     }
 
@@ -344,6 +461,16 @@
     return res;
   }
 
+  public int getMaxRetryCount()
+  {
+    return maxRetryCount;
+  }
+
+  public void setMaxRetryCount(int maxRetryCount)
+  {
+    this.maxRetryCount = maxRetryCount;
+  }
+
   public static class DirectoryScanner implements Serializable
   {
     private static final long serialVersionUID = 4535844463258899929L;
@@ -351,7 +478,7 @@
     private transient Pattern regex = null;
     private int partitionIndex;
     private int partitionCount;
-    protected final transient HashSet<String> ignoredFiles = new HashSet<String>();
+    private final transient HashSet<String> ignoredFiles = new HashSet<String>();
 
     public String getFilePatternRegexp()
     {
@@ -364,10 +491,24 @@
       this.regex = null;
     }
 
+    public Pattern getRegex() {
+      if (this.regex == null && this.filePatternRegexp != null)
+        this.regex = Pattern.compile(this.filePatternRegexp);
+      return this.regex;
+    }
+
+    public int getPartitionCount() {
+      return partitionCount;
+    }
+
+    public int getPartitionIndex() {
+      return partitionIndex;
+    }
+
     public LinkedHashSet<Path> scan(FileSystem fs, Path filePath, Set<String> consumedFiles)
     {
       if (filePatternRegexp != null && this.regex == null) {
-         this.regex = Pattern.compile(this.filePatternRegexp);
+        this.regex = Pattern.compile(this.filePatternRegexp);
       }
 
       LinkedHashSet<Path> pathSet = Sets.newLinkedHashSet();
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorFailureHandlingTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorFailureHandlingTest.java
new file mode 100644
index 0000000..f472dc8
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorFailureHandlingTest.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import com.datatorrent.api.*;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.google.common.collect.*;
+import java.io.*;
+import java.util.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.*;
+import org.junit.*;
+import org.junit.rules.TestWatcher;
+
+public class AbstractFSDirectoryInputOperatorFailureHandlingTest
+{
+  public static class TestMeta extends TestWatcher
+  {
+    public String dir = null;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      String methodName = description.getMethodName();
+      String className = description.getClassName();
+      this.dir = "target/" + className + "/" + methodName;
+    }
+  };
+
+  @Rule public TestMeta testMeta = new TestMeta();
+
+  public static class TestFSDirectoryInputOperator extends AbstractFSDirectoryInputOperator<String>
+  {
+    @OutputPortFieldAnnotation(name = "output")
+    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+    private transient BufferedReader br = null;
+    int count = 0;
+
+    @Override
+    protected InputStream openFile(Path path) throws IOException
+    {
+      InputStream is = super.openFile(path);
+      br = new BufferedReader(new InputStreamReader(is));
+      count = 0;
+      return is;
+    }
+
+    @Override
+    protected void closeFile(InputStream is) throws IOException
+    {
+      super.closeFile(is);
+      br.close();
+      count = 0;
+      br = null;
+    }
+
+    @Override protected InputStream retryFailedFile(FailedFile ff) throws IOException
+    {
+      count = 0;
+      return super.retryFailedFile(ff);
+    }
+
+    @Override
+    protected String readEntity() throws IOException
+    {
+      if (count != 0 && (count % 4) == 0) {
+        addToFailedList();
+        return null;
+      }
+      String str =  br.readLine();
+      return str;
+    }
+
+    @Override
+    protected void emit(String tuple)
+    {
+      output.emit(tuple);
+      count++;
+    }
+  }
+
+  @Test
+  public void testFailureHandling() throws Exception
+  {
+    FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+    HashSet<String> allLines = Sets.newHashSet();
+    // Create files with 100 records.
+    for (int file=0; file<10; file++) {
+      HashSet<String> lines = Sets.newHashSet();
+      for (int line=0; line<10; line++) {
+        lines.add("f"+file+"l"+line);
+      }
+      allLines.addAll(lines);
+      FileUtils.write(new File(testMeta.dir, "file"+file), StringUtils.join(lines, '\n'));
+    }
+
+    Thread.sleep(10);
+
+    TestFSDirectoryInputOperator oper = new TestFSDirectoryInputOperator();
+
+    CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+    oper.output.setSink(sink);
+
+    oper.setDirectory(testMeta.dir);
+    oper.getScanner().setFilePatternRegexp(".*file[\\d]");
+
+    oper.setup(null);
+    for (long wid=0; wid<1000; wid++) {
+      oper.beginWindow(wid);
+      oper.emitTuples();
+      oper.endWindow();
+    }
+    oper.teardown();
+
+    Assert.assertEquals("number tuples", 100, queryResults.collectedTuples.size());
+    Assert.assertEquals("lines", allLines, new HashSet<String>(queryResults.collectedTuples));
+
+  }
+}