Merge branch 'master' into release-1.0.2
Conflicts:
contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
index a3750c0..4989709 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
@@ -58,7 +58,7 @@
*/
@Nonnull
protected abstract PreparedStatement getUpdateCommand();
-
+
@Override
public void setup(Context.OperatorContext context)
{
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractExactlyOnceKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractExactlyOnceKafkaOutputOperator.java
new file mode 100644
index 0000000..dacfef8
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractExactlyOnceKafkaOutputOperator.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.kafka;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Partitioner;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.Pair;
+import com.datatorrent.contrib.kafka.AbstractKafkaOutputOperator;
+import com.google.common.collect.Sets;
+
+
+/**
+ * Kafka output operator, which, in most cases, guarantees to send tuples to kafka MQ only once.<br>
+ * Assuming messages kept in kafka are ordered by either key or value or keyvalue pair
+ * (For example, use timestamps as key), this Kafka OutputOperator always retrieve the last message from MQ as initial offset.
+ * So that replayed message wouldn't be sent to kafka again.
+ *
+ * This is not "perfect exact once" in 2 cases:
+ * 1 Multiple producers produce messages to same kafka partition
+ * 2 You have same message sent out and before kafka synchronized this message among all the brokers, the operator is
+ * started again.
+ *
+ * <br>
+ * Ports:<br>
+ * <b>Input</b>: One input port<br>
+ * <b>Output</b>: No output port<br>
+ * <br>
+ * Properties:<br>
+ * configProperties<br>
+ * <br>
+ * Compile time checks:<br>
+ * Class derived from has to implement 2 methods:<br>
+ * tupleToKeyValue() to convert input tuples to kafka key value objects<br>
+ * compareToLastMsg() to compare incoming tuple with the last received msg in kafka so that the operator could skip the received ones<br>
+ * <br>
+ * Run time checks:<br>
+ * None<br>
+ * <br>
+ * Benchmarks:<br>
+ * TBD<br>
+ * <br>
+ *
+ * @since 1.0.2
+ */
+
+public abstract class AbstractExactlyOnceKafkaOutputOperator<T, K, V> extends AbstractKafkaOutputOperator<K, V>
+{
+
+ private Map<Integer, Pair<byte[], byte[]>> lastMsgs;
+
+ private transient Partitioner partitioner;
+
+ private transient int partitionNum = 1;
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ try {
+ String className = (String) getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_PARTITIONER);
+ if (className != null) {
+ partitioner = (Partitioner) Class.forName(className).newInstance();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to initialize partitioner", e);
+ }
+ //read last message from kafka
+ initializeLastProcessingOffset();
+ }
+
+ public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() {
+ @Override
+ public void process(T tuple)
+ {
+ Pair<K, V> keyValue = tupleToKeyValue(tuple);
+ int pid = 0;
+
+ if(partitioner!=null){
+ pid = partitioner.partition(keyValue.first, partitionNum);
+ }
+
+ Pair<byte[], byte[]> lastMsg = lastMsgs.get(pid);
+
+ if(lastMsg == null || compareToLastMsg(keyValue, lastMsg) > 0){
+ getProducer().send(new KeyedMessage<K, V>(getTopic(), keyValue.first, keyValue.second));
+ System.out.println("Send out the message " + keyValue.second);
+ sendCount ++;
+ } else {
+ // ignore tuple because kafka has already had the tuple
+ logger.debug("Ingore tuple " + tuple);
+ return;
+ }
+ }
+ };
+
+ private void initializeLastProcessingOffset()
+ {
+
+ // read last received kafka message
+ TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST)), this.getTopic());
+
+ if (tm == null) {
+ throw new RuntimeException("Failed to retrieve topic metadata");
+ }
+
+ partitionNum = tm.partitionsMetadata().size();
+
+ lastMsgs = new HashMap<Integer, Pair<byte[],byte[]>>(partitionNum);
+
+ for (PartitionMetadata pm : tm.partitionsMetadata()) {
+
+ String leadBroker = pm.leader().host();
+ int port = pm.leader().port();
+ String clientName = this.getClass().getName().replace('$', '.') + "_Client_" + tm.topic() + "_" + pm.partitionId();
+ SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
+
+ long readOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(), kafka.api.OffsetRequest.LatestTime(), clientName);
+
+ FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(tm.topic(), pm.partitionId(), readOffset - 1, 100000).build();
+
+ FetchResponse fetchResponse = consumer.fetch(req);
+ for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(tm.topic(), pm.partitionId())) {
+
+ Message m = messageAndOffset.message();
+
+ ByteBuffer payload = m.payload();
+ ByteBuffer key = m.key();
+ byte[] valueBytes = new byte[payload.limit()];
+ byte[] keyBytes = new byte[key.limit()];
+ payload.get(valueBytes);
+ key.get(keyBytes);
+ lastMsgs.put(pm.partitionId(), new Pair<byte[], byte[]>(keyBytes, valueBytes));
+ }
+
+ }
+
+ }
+
+ /**
+ * compare the incoming tuple with the last received message in kafka.
+ *
+ * @param tupleKeyValue
+ * @param lastReceivedKeyValue
+ * @return <=0 if tupleKeyValue is supposed to be before lastReceivedKeyValue
+ * >0 if tupleKeyValue is after the lastReceivedKeyValue
+ */
+ protected abstract int compareToLastMsg(Pair<K, V> tupleKeyValue, Pair<byte[], byte[]> lastReceivedKeyValue);
+
+ /**
+ * Tell the operator how to convert a input tuple to a kafka key value pair
+ * @param tuple
+ * @return
+ */
+ protected abstract Pair<K, V> tupleToKeyValue(T tuple);
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractExactlyOnceKafkaOutputOperator.class);
+
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
index decbfa4..846b0ef 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
@@ -41,6 +41,10 @@
*/
public class KafkaMetadataUtil
{
+
+ public static final String PRODUCER_PROP_PARTITIONER = "partitioner.class";
+
+ public static final String PRODUCER_PROP_BROKERLIST = "metadata.broker.list";
private static Logger logger = LoggerFactory.getLogger(KafkaMetadataUtil.class);
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java
new file mode 100644
index 0000000..81e9fdd
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.kafka;
+import com.datatorrent.api.ActivationListener;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.Pair;
+
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase
+{
+ private static final Logger logger = LoggerFactory.getLogger(KafkaExactlyOnceOutputOperatorTest.class);
+ private static final int maxTuple = 40;
+ private static CountDownLatch latch;
+ private static boolean isRestarted = false;
+
+ /**
+ * Tuple generator for testing.
+ */
+ public static class StringGeneratorInputOperator implements InputOperator, ActivationListener<OperatorContext>
+ {
+ public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<String>();
+ private final transient ArrayBlockingQueue<String> stringBuffer = new ArrayBlockingQueue<String>(1024);
+ private volatile Thread dataGeneratorThread;
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ @Override
+ public void activate(OperatorContext ctx)
+ {
+ dataGeneratorThread = new Thread("String Generator")
+ {
+ @Override
+ public void run()
+ {
+ try {
+ int i = 0;
+ while (dataGeneratorThread != null && i < maxTuple) {
+ stringBuffer.put((++i) + "###testString " + i);
+ }
+ stringBuffer.put((maxTuple + 1) + "###" + KafkaOperatorTestBase.END_TUPLE);
+ }
+ catch (InterruptedException ie) {
+ }
+ }
+ };
+ dataGeneratorThread.start();
+ }
+
+ @Override
+ public void deactivate()
+ {
+ dataGeneratorThread = null;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ for (int i = stringBuffer.size(); i-- > 0;) {
+ if (i == 20 && isRestarted == false) {
+ isRestarted = true;
+ // fail the operator and when it gets back resend everything
+ throw new RuntimeException();
+ }
+ outputPort.emit(stringBuffer.poll());
+ }
+ }
+ } // End of StringGeneratorInputOperator
+
+ /**
+ * Test AbstractKafkaExactOnceOutputOperator (i.e. an output adapter for Kafka, aka producer).
+ * This module sends data into a Kafka message bus.
+ *
+ * [Generate tuple] ==> [send tuple through Kafka output adapter(i.e. producer) into Kafka message bus](fail the producer at certain point and bring it back)
+ * ==> [receive data in outside Kafka listener (i.e consumer)] ==> Verify kafka doesn't receive duplicated message
+ *
+ * @throws Exception
+ */
+ @Test
+ @SuppressWarnings({"rawtypes"})
+ public void testKafkaExactOnceOutputOperator() throws Exception
+ {
+ //initialize the latch to synchronize the threads
+ latch = new CountDownLatch(maxTuple);
+ // Setup a message listener to receive the message
+ KafkaTestConsumer listener = new KafkaTestConsumer("topic1");
+ listener.setLatch(latch);
+
+ // Malhar module to send message
+ // Create DAG for testing.
+ LocalMode lma = LocalMode.newInstance();
+ final DAG dag = lma.getDAG();
+
+ StringGeneratorInputOperator generator = dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class);
+ final SimpleKafkaExactOnceOutputOperator node = dag.addOperator("Kafka message producer", SimpleKafkaExactOnceOutputOperator.class);
+
+ Properties props = new Properties();
+ props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("metadata.broker.list", "localhost:9092");
+ props.setProperty("producer.type", "async");
+ props.setProperty("queue.buffering.max.ms", "200");
+ props.setProperty("queue.buffering.max.messages", "10");
+ props.setProperty("batch.num.messages", "5");
+
+ node.setConfigProperties(props);
+ // Set configuration parameters for Kafka
+ node.setTopic("topic1");
+
+ // Connect ports
+ dag.addStream("Kafka message", generator.outputPort, node.inputPort).setLocality(Locality.CONTAINER_LOCAL);
+
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ Future f = Executors.newFixedThreadPool(1).submit(listener);
+ f.get(30, TimeUnit.SECONDS);
+
+ lc.shutdown();
+
+ // Check values send vs received
+ Assert.assertEquals("Number of emitted tuples", maxTuple, listener.holdingBuffer.size());
+ logger.debug(String.format("Number of emitted tuples: %d", listener.holdingBuffer.size()));
+ Assert.assertEquals("First tuple", "testString 1", listener.getMessage(listener.holdingBuffer.peek()));
+
+ listener.close();
+
+ }
+
+ public static class SimpleKafkaExactOnceOutputOperator extends AbstractExactlyOnceKafkaOutputOperator<String, String, String>{
+
+ @Override
+ protected int compareToLastMsg(Pair<String, String> tupleKeyValue, Pair<byte[], byte[]> lastReceivedKeyValue)
+ {
+ return Integer.parseInt(tupleKeyValue.first) - Integer.parseInt(new String(lastReceivedKeyValue.first));
+ }
+
+ @Override
+ protected Pair<String, String> tupleToKeyValue(String tuple)
+ {
+ return new Pair<String, String>(tuple.split("###")[0], tuple.split("###")[1]);
+ }
+
+ }
+
+
+}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
index b78314a..f1b5013 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
@@ -21,11 +21,16 @@
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+
import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +44,7 @@
private static int tupleCount = 0;
private static final int maxTuple = 20;
private static CountDownLatch latch;
-
+
/**
* Tuple generator for testing.
*/
@@ -128,30 +133,41 @@
listener.setLatch(latch);
new Thread(listener).start();
- // Malhar module to send message
// Create DAG for testing.
LocalMode lma = LocalMode.newInstance();
+
+ StreamingApplication app = new StreamingApplication() {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ }
+ };
+
DAG dag = lma.getDAG();
// Create ActiveMQStringSinglePortOutputOperator
StringGeneratorInputOperator generator = dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class);
- KafkaSinglePortOutputOperator node = dag.addOperator("Kafka message producer", KafkaSinglePortOutputOperator.class);
-
+ KafkaSinglePortOutputOperator node = dag.addOperator("KafkaMessageProducer", KafkaSinglePortOutputOperator.class);
+
Properties props = new Properties();
props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
- props.put("metadata.broker.list", "localhost:9092");
+ props.put("metadata.broker.list", "invalidhost:9092");
props.setProperty("producer.type", "async");
props.setProperty("queue.buffering.max.ms", "200");
props.setProperty("queue.buffering.max.messages", "10");
props.setProperty("batch.num.messages", "5");
-
+
node.setConfigProperties(props);
- // Set configuration parameters for Kafka
node.setTopic("topic1");
// Connect ports
dag.addStream("Kafka message", generator.outputPort, node.inputPort).setLocality(Locality.CONTAINER_LOCAL);
+ // MLHR-1143: verify we can set broker list (and other properties) through configuration
+ Configuration conf = new Configuration(false);
+ conf.set("dt.operator.KafkaMessageProducer.prop.configProperties(metadata.broker.list)", "localhost:9092");
+ lma.prepareDAG(app, conf);
+
// Create local cluster
final LocalMode.Controller lc = lma.getController();
lc.runAsync();
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestConsumer.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestConsumer.java
index a2fa664..a276c14 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestConsumer.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestConsumer.java
@@ -108,7 +108,7 @@
latch.countDown();
}
if(getMessage(msg).equals(KafkaOperatorTestBase.END_TUPLE)){
- return;
+ break;
}
holdingBuffer.add(msg);
receiveCount++;
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
index cc3b593..0d64922 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
@@ -56,6 +56,9 @@
* <p/>
* Supports partitioning and dynamic changes to number of partitions through property {@link #partitionCount}. The
* directory scanner is responsible to only accept the files that belong to a partition.
+ * <p/>
+ * This class supports retrying of failed files by putting them into failed list, and retrying them after pending
+ * files are processed. Retrying is disabled when maxRetryCount is set to zero.
*
* @since 1.0.2
*/
@@ -74,7 +77,52 @@
private int emitBatchSize = 1000;
private int currentPartitions = 1 ;
private int partitionCount = 1;
+ private int retryCount = 0;
+ private int maxRetryCount = 5;
+ transient protected int skipCount = 0;
+ /**
+ * Class representing failed file, When read fails on a file in middle, then the file is
+ * added to failedList along with last read offset.
+ * The files from failedList will be processed after all pendingFiles are processed, but
+ * before checking for new files.
+ * failed file is retried for maxRetryCount number of times, after that the file is
+ * ignored.
+ */
+ protected static class FailedFile {
+ String path;
+ int offset;
+ int retryCount;
+ long lastFailedTime;
+
+ /* For kryo serialization */
+ protected FailedFile() {}
+
+ protected FailedFile(String path, int offset) {
+ this.path = path;
+ this.offset = offset;
+ this.retryCount = 0;
+ }
+
+ protected FailedFile(String path, int offset, int retryCount) {
+ this.path = path;
+ this.offset = offset;
+ this.retryCount = retryCount;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "FailedFile[" +
+ "path='" + path + '\'' +
+ ", offset=" + offset +
+ ", retryCount=" + retryCount +
+ ", lastFailedTime=" + lastFailedTime +
+ ']';
+ }
+ }
+ /* List of failed file */
+ private final Queue<FailedFile> failedFiles = new LinkedList<FailedFile>();
protected transient FileSystem fs;
protected transient Configuration configuration;
@@ -146,7 +194,8 @@
configuration = new Configuration();
fs = FileSystem.newInstance(filePath.toUri(), configuration);
if (currentFile != null && offset > 0) {
- LOG.info("Continue reading {} from index {}", currentFile, offset);
+ long startTime = System.currentTimeMillis();
+ LOG.info("Continue reading {} from index {} time={}", currentFile, offset, startTime);
int index = offset;
this.inputStream = openFile(new Path(currentFile));
// fast forward to previous offset
@@ -154,6 +203,7 @@
readEntity();
offset++;
}
+ LOG.info("Read offset={} records in setup time={}", offset, System.currentTimeMillis() - startTime);
}
}
catch (IOException ex) {
@@ -188,21 +238,25 @@
final public void emitTuples()
{
if (inputStream == null) {
- if (pendingFiles.isEmpty()) {
+ if (pendingFiles.isEmpty() && failedFiles.isEmpty()) {
if (System.currentTimeMillis() - scanIntervalMillis > lastScanMillis) {
pendingFiles = scanner.scan(fs, filePath, processedFiles);
lastScanMillis = System.currentTimeMillis();
}
}
- if (!pendingFiles.isEmpty())
- {
- Path path = pendingFiles.iterator().next();
- pendingFiles.remove(path);
- try {
+
+ try {
+ if (!pendingFiles.isEmpty()) {
+ Path path = pendingFiles.iterator().next();
+ pendingFiles.remove(path);
this.inputStream = openFile(path);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
}
+ else if (!failedFiles.isEmpty()) {
+ FailedFile ff = failedFiles.poll();
+ this.inputStream = retryFailedFile(ff);
+ }
+ }catch (IOException ex) {
+ throw new RuntimeException(ex);
}
}
@@ -216,8 +270,17 @@
closeFile(inputStream);
break;
}
- offset++;
- emit(line);
+
+ /* If skipCount is non zero, then failed file recovery is going on, skipCount is
+ * used to prevent already emitted records from being emitted again during recovery.
+ * When failed file is open, skipCount is set to the last read offset for that file.
+ */
+ if (skipCount == 0) {
+ offset++;
+ emit(line);
+ }
+ else
+ skipCount--;
}
} catch (IOException e) {
LOG.error("FS reader error", e);
@@ -226,20 +289,61 @@
}
}
+ protected void addToFailedList() throws IOException {
+
+ FailedFile ff = new FailedFile(currentFile, offset, retryCount);
+
+ // try to close file
+ if (this.inputStream != null)
+ this.inputStream.close();
+
+ ff.retryCount ++;
+ ff.lastFailedTime = System.currentTimeMillis();
+ ff.offset = this.offset;
+
+ // Clear current file state.
+ this.currentFile = null;
+ this.inputStream = null;
+ this.offset = 0;
+
+ if (ff.retryCount > maxRetryCount)
+ return;
+
+ LOG.info("adding to failed list path {} offset {} retry {}", ff.path, ff.offset, ff.retryCount);
+ failedFiles.add(ff);
+ }
+
+ protected InputStream retryFailedFile(FailedFile ff) throws IOException
+ {
+ LOG.info("retrying failed file {} offset {} retry {}", ff.path, ff.offset, ff.retryCount);
+ String path = ff.path;
+ this.inputStream = openFile(new Path(path));
+ this.offset = ff.offset;
+ this.retryCount = ff.retryCount;
+ this.skipCount = ff.offset;
+ return this.inputStream;
+ }
+
protected InputStream openFile(Path path) throws IOException
{
LOG.info("opening file {}", path);
InputStream input = fs.open(path);
currentFile = path.toString();
offset = 0;
+ retryCount = 0;
+ skipCount = 0;
return input;
}
protected void closeFile(InputStream is) throws IOException
{
- is.close();
- is = null;
- processedFiles.add(currentFile);
+ LOG.info("closing file {} offset {}", currentFile, offset);
+
+ if (is != null)
+ is.close();
+ if (currentFile != null)
+ processedFiles.add(currentFile);
+
currentFile = null;
inputStream = null;
}
@@ -267,9 +371,11 @@
Set<String> totalProcessedFiles = new HashSet<String>();
List<Pair<String, Integer>> currentFiles = new ArrayList<Pair<String, Integer>>();
List<DirectoryScanner> oldscanners = new LinkedList<DirectoryScanner>();
+ List<FailedFile> totalFailedFiles = new LinkedList<FailedFile>();
for(Partition<AbstractFSDirectoryInputOperator<T>> partition : partitions) {
AbstractFSDirectoryInputOperator<T> oper = partition.getPartitionedInstance();
totalProcessedFiles.addAll(oper.processedFiles);
+ totalFailedFiles.addAll(oper.failedFiles);
if (oper.currentFile != null)
currentFiles.add(new Pair<String, Integer>(oper.currentFile, oper.offset));
oldscanners.add(oper.getScanner());
@@ -288,7 +394,7 @@
DirectoryScanner scn = scanners.get(i);
oper.setScanner(scn);
- // Do state transfer
+ // Do state transfer for processed files.
oper.processedFiles.addAll(totalProcessedFiles);
/* set current scanning directory and offset */
@@ -302,6 +408,17 @@
}
}
+ /* transfer failed files */
+ oper.failedFiles.clear();
+ Iterator<FailedFile> iter = totalFailedFiles.iterator();
+ while (iter.hasNext()) {
+ FailedFile ff = iter.next();
+ if (scn.acceptFile(ff.path)) {
+ oper.failedFiles.add(ff);
+ iter.remove();
+ }
+ }
+
newPartitions.add(new DefaultPartition<AbstractFSDirectoryInputOperator<T>>(oper));
}
@@ -344,6 +461,16 @@
return res;
}
+ public int getMaxRetryCount()
+ {
+ return maxRetryCount;
+ }
+
+ public void setMaxRetryCount(int maxRetryCount)
+ {
+ this.maxRetryCount = maxRetryCount;
+ }
+
public static class DirectoryScanner implements Serializable
{
private static final long serialVersionUID = 4535844463258899929L;
@@ -351,7 +478,7 @@
private transient Pattern regex = null;
private int partitionIndex;
private int partitionCount;
- protected final transient HashSet<String> ignoredFiles = new HashSet<String>();
+ private final transient HashSet<String> ignoredFiles = new HashSet<String>();
public String getFilePatternRegexp()
{
@@ -364,10 +491,24 @@
this.regex = null;
}
+ public Pattern getRegex() {
+ if (this.regex == null && this.filePatternRegexp != null)
+ this.regex = Pattern.compile(this.filePatternRegexp);
+ return this.regex;
+ }
+
+ public int getPartitionCount() {
+ return partitionCount;
+ }
+
+ public int getPartitionIndex() {
+ return partitionIndex;
+ }
+
public LinkedHashSet<Path> scan(FileSystem fs, Path filePath, Set<String> consumedFiles)
{
if (filePatternRegexp != null && this.regex == null) {
- this.regex = Pattern.compile(this.filePatternRegexp);
+ this.regex = Pattern.compile(this.filePatternRegexp);
}
LinkedHashSet<Path> pathSet = Sets.newLinkedHashSet();
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorFailureHandlingTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorFailureHandlingTest.java
new file mode 100644
index 0000000..f472dc8
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperatorFailureHandlingTest.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import com.datatorrent.api.*;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.google.common.collect.*;
+import java.io.*;
+import java.util.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.*;
+import org.junit.*;
+import org.junit.rules.TestWatcher;
+
+public class AbstractFSDirectoryInputOperatorFailureHandlingTest
+{
+ public static class TestMeta extends TestWatcher
+ {
+ public String dir = null;
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+ String methodName = description.getMethodName();
+ String className = description.getClassName();
+ this.dir = "target/" + className + "/" + methodName;
+ }
+ };
+
+ @Rule public TestMeta testMeta = new TestMeta();
+
+ public static class TestFSDirectoryInputOperator extends AbstractFSDirectoryInputOperator<String>
+ {
+ @OutputPortFieldAnnotation(name = "output")
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+ private transient BufferedReader br = null;
+ int count = 0;
+
+ @Override
+ protected InputStream openFile(Path path) throws IOException
+ {
+ InputStream is = super.openFile(path);
+ br = new BufferedReader(new InputStreamReader(is));
+ count = 0;
+ return is;
+ }
+
+ @Override
+ protected void closeFile(InputStream is) throws IOException
+ {
+ super.closeFile(is);
+ br.close();
+ count = 0;
+ br = null;
+ }
+
+ @Override protected InputStream retryFailedFile(FailedFile ff) throws IOException
+ {
+ count = 0;
+ return super.retryFailedFile(ff);
+ }
+
+ @Override
+ protected String readEntity() throws IOException
+ {
+ if (count != 0 && (count % 4) == 0) {
+ addToFailedList();
+ return null;
+ }
+ String str = br.readLine();
+ return str;
+ }
+
+ @Override
+ protected void emit(String tuple)
+ {
+ output.emit(tuple);
+ count++;
+ }
+ }
+
+ @Test
+ public void testFailureHandling() throws Exception
+ {
+ FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+ HashSet<String> allLines = Sets.newHashSet();
+ // Create files with 100 records.
+ for (int file=0; file<10; file++) {
+ HashSet<String> lines = Sets.newHashSet();
+ for (int line=0; line<10; line++) {
+ lines.add("f"+file+"l"+line);
+ }
+ allLines.addAll(lines);
+ FileUtils.write(new File(testMeta.dir, "file"+file), StringUtils.join(lines, '\n'));
+ }
+
+ Thread.sleep(10);
+
+ TestFSDirectoryInputOperator oper = new TestFSDirectoryInputOperator();
+
+ CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ oper.output.setSink(sink);
+
+ oper.setDirectory(testMeta.dir);
+ oper.getScanner().setFilePatternRegexp(".*file[\\d]");
+
+ oper.setup(null);
+ for (long wid=0; wid<1000; wid++) {
+ oper.beginWindow(wid);
+ oper.emitTuples();
+ oper.endWindow();
+ }
+ oper.teardown();
+
+ Assert.assertEquals("number tuples", 100, queryResults.collectedTuples.size());
+ Assert.assertEquals("lines", allLines, new HashSet<String>(queryResults.collectedTuples));
+
+ }
+}