JdbcInput and HDFS output example app
SPOI-8251 jdbc to jdbc app
diff --git a/examples/jdbcIngest/.gitignore b/examples/jdbcIngest/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/examples/jdbcIngest/.gitignore
@@ -0,0 +1 @@
+/target/
diff --git a/examples/jdbcIngest/README.md b/examples/jdbcIngest/README.md
new file mode 100644
index 0000000..ec01985
--- /dev/null
+++ b/examples/jdbcIngest/README.md
@@ -0,0 +1,65 @@
+## Sample mysql implementation
+
+This project contains two applications to read records from a table in `MySQL`, create POJOs and write them to a file
+in the user specified directory in HDFS.
+
+1. SimpleJdbcToHDFSApp: Reads table records as per given query and emits them as POJOs.
+2. PollJdbcToHDFSApp: Reads table records using partitions in parallel fashion also polls for newly **appended** records and emits them as POJOs.
+
+Follow these steps to run these applications:
+
+**Step 1**: Update these properties in the file `src/main/resources/META_INF/properties-<applicationName>.xml`:
+
+| Property Name  | Description |
+| -------------  | ----------- |
+| dt.application.<applicationName>.operator.JdbcInput.prop.store.databaseUrl | database URL of the form `jdbc:mysql://hostName:portNumber/dbName` |
+| dt.application.<applicationName>.operator.JdbcInput.prop.store.userName | MySQL user name |
+| dt.application.<applicationName>.operator.JdbcInput.prop.store.password | MySQL user password |
+| dt.application.<applicationName>.operator.FileOutputOperator.filePath   | HDFS output directory path |
+
+**Step 2**: Create database table and add entries
+
+Go to the MySQL console and run (where _{path}_ is a suitable prefix):
+
+    mysql> source {path}/src/test/resources/example.sql
+
+After this, please verify that `testDev.test_event_table` is created and has 10 rows:
+
+    mysql> select count(*) from testDev.test_event_table;
+    +----------+
+    | count(*) |
+    +----------+
+    |       10 |
+    +----------+
+
+**Step 3**: Create HDFS output directory if not already present (_{path}_ should be the same as specified in `META_INF/properties-<applicationName>.xml`):
+
+    hadoop fs -mkdir -p {path}
+
+**Step 4**: Build the code:
+
+    shell> mvn clean install
+
+Upload the `target/jdbcInput-1.0-SNAPSHOT.apa` to the UI console if available or launch it from
+the commandline using `apexcli`.
+
+**Step 5**: During launch use `src/main/resources/META_INF/properties-<applicationName>.xml` as a custom configuration file; then verify
+that the output directory has the expected output:
+
+    shell> hadoop fs -cat <hadoop directory path>/2_op.dat.* | wc -l
+
+This should return 10 as the count.
+
+Sample Output:
+
+    hadoop fs -cat <path_to_file>/2_op.dat.0
+    PojoEvent [accountNumber=1, name=User1, amount=1000]
+    PojoEvent [accountNumber=2, name=User2, amount=2000]
+    PojoEvent [accountNumber=3, name=User3, amount=3000]
+    PojoEvent [accountNumber=4, name=User4, amount=4000]
+    PojoEvent [accountNumber=5, name=User5, amount=5000]
+    PojoEvent [accountNumber=6, name=User6, amount=6000]
+    PojoEvent [accountNumber=7, name=User7, amount=7000]
+    PojoEvent [accountNumber=8, name=User8, amount=8000]
+    PojoEvent [accountNumber=9, name=User9, amount=9000]
+    PojoEvent [accountNumber=10, name=User10, amount=1000]
diff --git a/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl b/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..08075a9
--- /dev/null
+++ b/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+            http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+
+<!--
+    Document   : XmlJavadocCommentsExtractor.xsl
+    Created on : September 16, 2014, 11:30 AM
+    Description:
+        The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
+-->
+
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
+  <xsl:output method="xml" standalone="yes"/>
+
+  <!-- copy xml by selecting only the following nodes, attributes and text -->
+  <xsl:template match="node()|text()|@*">
+    <xsl:copy>
+      <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+    </xsl:copy>
+  </xsl:template>
+
+  <!-- Strip off the following paths from the selected xml -->
+  <xsl:template match="//root/package/interface/interface
+                      |//root/package/interface/method/@qualified
+                      |//root/package/class/interface
+                      |//root/package/class/class
+                      |//root/package/class/method/@qualified
+                      |//root/package/class/field/@qualified" />
+
+  <xsl:strip-space elements="*"/>
+</xsl:stylesheet>
diff --git a/examples/jdbcIngest/pom.xml b/examples/jdbcIngest/pom.xml
new file mode 100644
index 0000000..f9288b8
--- /dev/null
+++ b/examples/jdbcIngest/pom.xml
@@ -0,0 +1,298 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.example</groupId>
+  <version>1.0-SNAPSHOT</version>
+  <artifactId>jdbcInput</artifactId>
+  <packaging>jar</packaging>
+
+  <!-- change these to the appropriate values -->
+  <name>JDBC Input Operator</name>
+  <description>Example Uses of JDBC Input Operator</description>
+
+  <properties>
+    <!-- change this if you desire to use a different version of Apex Core -->
+    <apex.version>3.5.0</apex.version>
+    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+    <malhar.version>3.6.0</malhar.version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-eclipse-plugin</artifactId>
+        <version>2.9</version>
+        <configuration>
+          <downloadSources>true</downloadSources>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.3</version>
+        <configuration>
+          <encoding>UTF-8</encoding>
+          <source>1.7</source>
+          <target>1.7</target>
+          <debug>true</debug>
+          <optimize>false</optimize>
+          <showDeprecation>true</showDeprecation>
+          <showWarnings>true</showWarnings>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.8</version>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>prepare-package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>target/deps</outputDirectory>
+              <includeScope>runtime</includeScope>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>app-package-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
+              <appendAssemblyId>false</appendAssemblyId>
+              <descriptors>
+                <descriptor>src/assemble/appPackage.xml</descriptor>
+              </descriptors>
+              <archiverConfig>
+                <defaultDirectoryMode>0755</defaultDirectoryMode>
+              </archiverConfig>
+              <archive>
+                <manifestEntries>
+                  <Class-Path>${apex.apppackage.classpath}</Class-Path>
+                  <DT-Engine-Version>${apex.version}</DT-Engine-Version>
+                  <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
+                  <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
+                  <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
+                  <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
+                  <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
+                </manifestEntries>
+              </archive>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <version>1.7</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <configuration>
+              <target>
+                <move
+                  file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
+                  tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
+              </target>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+          <execution>
+            <!-- create resource directory for xml javadoc -->
+            <id>createJavadocDirectory</id>
+            <phase>generate-resources</phase>
+            <configuration>
+              <tasks>
+                <delete
+                  dir="${project.build.directory}/generated-resources/xml-javadoc" />
+                <mkdir
+                  dir="${project.build.directory}/generated-resources/xml-javadoc" />
+              </tasks>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.9.1</version>
+        <executions>
+          <execution>
+            <id>attach-artifacts</id>
+            <phase>package</phase>
+            <goals>
+              <goal>attach-artifact</goal>
+            </goals>
+            <configuration>
+              <artifacts>
+                <artifact>
+                  <file>target/${project.artifactId}-${project.version}.apa</file>
+                  <type>apa</type>
+                </artifact>
+              </artifacts>
+              <skipAttach>false</skipAttach>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- generate javdoc -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <executions>
+          <!-- generate xml javadoc -->
+          <execution>
+            <id>xml-doclet</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>javadoc</goal>
+            </goals>
+            <configuration>
+              <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+              <additionalparam>-d
+                ${project.build.directory}/generated-resources/xml-javadoc
+                -filename
+                ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+              <useStandardDocletOptions>false</useStandardDocletOptions>
+              <docletArtifact>
+                <groupId>com.github.markusbernhardt</groupId>
+                <artifactId>xml-doclet</artifactId>
+                <version>1.0.4</version>
+              </docletArtifact>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- Transform xml javadoc to stripped down version containing only 
+        class/interface comments and tags -->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>xml-maven-plugin</artifactId>
+        <version>1.0</version>
+        <executions>
+          <execution>
+            <id>transform-xmljavadoc</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>transform</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <transformationSets>
+            <transformationSet>
+              <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
+              <includes>
+                <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+              </includes>
+              <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+              <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
+            </transformationSet>
+          </transformationSets>
+        </configuration>
+      </plugin>
+      <!-- copy xml javadoc to class jar -->
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <version>2.6</version>
+        <executions>
+          <execution>
+            <id>copy-resources</id>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/classes</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
+                  <includes>
+                    <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+                  </includes>
+                  <filtering>true</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+
+  </build>
+
+  <dependencies>
+    <!-- add your dependencies here -->
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${malhar.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.10</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-engine</artifactId>
+      <version>${apex.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>5.1.36</version>
+    </dependency>
+    <dependency>
+      <groupId>org.jooq</groupId>
+      <artifactId>jooq</artifactId>
+      <version>3.6.4</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>janino</artifactId>
+      <version>2.7.8</version>
+    </dependency>
+    <dependency>
+      <groupId>org.hsqldb</groupId>
+      <artifactId>hsqldb</artifactId>
+      <version>2.3.1</version>
+    </dependency>
+
+  </dependencies>
+
+</project>
diff --git a/examples/jdbcIngest/src/assemble/appPackage.xml b/examples/jdbcIngest/src/assemble/appPackage.xml
new file mode 100644
index 0000000..7ad071c
--- /dev/null
+++ b/examples/jdbcIngest/src/assemble/appPackage.xml
@@ -0,0 +1,43 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/resources</directory>
+      <outputDirectory>/resources</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+
diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java
new file mode 100644
index 0000000..e155f23
--- /dev/null
+++ b/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.example.mydtapp;
+
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
+public class FileLineOutputOperator extends AbstractFileOutputOperator<Object>
+{
+  @Override
+  protected String getFileName(Object input)
+  {
+    return context.getId() + "_" + "op.dat";
+  }
+
+  @Override
+  protected byte[] getBytesForTuple(Object input)
+  {
+    return (input.toString() + "\n").getBytes();
+  }
+}
diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java
new file mode 100644
index 0000000..5605bcf
--- /dev/null
+++ b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.example.mydtapp;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+
+@ApplicationAnnotation(name = "SimpleJdbcToHDFSApp")
+public class JdbcHDFSApp implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator());
+    /**
+     * The class given below can be updated to the user defined class based on
+     * input table schema The addField infos method needs to be updated
+     * accordingly This line can be commented and class can be set from the
+     * properties file
+     */
+   // dag.setOutputPortAttribute(jdbcInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, PojoEvent.class);
+
+    jdbcInputOperator.setFieldInfos(addFieldInfos());
+
+    JdbcStore store = new JdbcStore();
+    jdbcInputOperator.setStore(store);
+
+    FileLineOutputOperator fileOutput = dag.addOperator("FileOutputOperator", new FileLineOutputOperator());
+
+    dag.addStream("POJO's", jdbcInputOperator.outputPort, fileOutput.input).setLocality(Locality.CONTAINER_LOCAL);
+  }
+
+  /**
+   * This method can be modified to have field mappings based on used defined
+   * class
+   */
+  private List<FieldInfo> addFieldInfos()
+  {
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER));
+    fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING));
+    fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER));
+    return fieldInfos;
+  }
+
+}
diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java
new file mode 100644
index 0000000..54d71f7
--- /dev/null
+++ b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java
@@ -0,0 +1,48 @@
+package com.example.mydtapp;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOPollInputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.google.common.collect.Lists;
+
+@ApplicationAnnotation(name = "PollJdbcToHDFSApp")
+public class JdbcPollerApplication implements StreamingApplication
+{
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    JdbcPOJOPollInputOperator poller = dag.addOperator("JdbcPoller", new JdbcPOJOPollInputOperator());
+
+    JdbcStore store = new JdbcStore();
+    poller.setStore(store);
+
+    poller.setFieldInfos(addFieldInfos());
+
+    FileLineOutputOperator writer = dag.addOperator("Writer", new FileLineOutputOperator());
+    dag.setInputPortAttribute(writer.input, PortContext.PARTITION_PARALLEL, true);
+    writer.setRotationWindows(60);
+
+    dag.addStream("dbrecords", poller.outputPort, writer.input);
+  }
+
+  /**
+   * This method can be modified to have field mappings based on used defined
+   * class
+   */
+  private List<FieldInfo> addFieldInfos()
+  {
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER));
+    fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING));
+    fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER));
+    return fieldInfos;
+  }
+}
diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java
new file mode 100644
index 0000000..f56522b
--- /dev/null
+++ b/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java
@@ -0,0 +1,44 @@
+package com.example.mydtapp;
+
+public class PojoEvent
+{
+  @Override
+  public String toString()
+  {
+    return "PojoEvent [accountNumber=" + accountNumber + ", name=" + name + ", amount=" + amount + "]";
+  }
+
+  private int accountNumber;
+  private String name;
+  private int amount;
+
+  public int getAccountNumber()
+  {
+    return accountNumber;
+  }
+
+  public void setAccountNumber(int accountNumber)
+  {
+    this.accountNumber = accountNumber;
+  }
+
+  public String getName()
+  {
+    return name;
+  }
+
+  public void setName(String name)
+  {
+    this.name = name;
+  }
+
+  public int getAmount()
+  {
+    return amount;
+  }
+
+  public void setAmount(int amount)
+  {
+    this.amount = amount;
+  }
+}
diff --git a/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml b/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml
new file mode 100644
index 0000000..6e7aaf6
--- /dev/null
+++ b/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0"?>
+<configuration>
+    <!-- Static partitioning, specify the partition count, this decides how 
+        many ranges would be initiated -->
+    <property>
+        <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.partitionCount</name>
+        <value>2</value>
+    </property>
+
+    <property>
+        <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseDriver</name>
+        <value>com.mysql.jdbc.Driver</value>
+    </property>
+
+    <property>
+        <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseUrl</name>
+        <value>jdbc:mysql://localhost:3306/testDev</value>
+    </property>
+
+    <property>
+        <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.userName</name>
+        <value>root</value>
+    </property>
+    
+    <property>
+        <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.password</name>
+        <value>mysql</value>
+    </property>
+
+    <!-- Batch size for poller -->
+    <property>
+        <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.batchSize</name>
+        <value>300</value>
+    </property>
+
+    <!-- look-up key for forming range queries, this would be the column name 
+        on which the table is sorted -->
+    <property>
+        <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.key</name>
+        <value>ACCOUNT_NO</value>
+    </property>
+
+    <property>
+        <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.columnsExpression</name>
+        <value>ACCOUNT_NO,NAME,AMOUNT</value>
+    </property>
+    <property>
+      <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS</name>
+      <value>com.example.mydtapp.PojoEvent</value>
+    </property>
+
+    <!-- Table name -->
+    <property>
+        <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.tableName</name>
+        <value>test_event_table</value>
+    </property>
+
+    <property>
+        <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.pollInterval</name>
+        <value>1000</value>
+    </property>
+
+    <!-- Output folder for HDFS output operator -->
+    <property>
+        <name>dt.application.PollJdbcToHDFSApp.operator.Writer.filePath</name>
+        <value>/tmp/test/output</value>
+    </property>
+
+  <property>
+    <name>dt.loggers.level</name>
+    <value>com.datatorrent.*:DEBUG,org.apache.*:INFO</value>
+  </property>
+</configuration>
diff --git a/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml b/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml
new file mode 100644
index 0000000..9fce7f8
--- /dev/null
+++ b/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0"?>
+<configuration>
+  <!-- <property> <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name> 
+    <value>some-default-value (if value is not specified, it is required from 
+    the user or custom config when launching)</value> </property> -->
+  <!-- memory assigned to app master <property> <name>dt.attr.MASTER_MEMORY_MB</name> 
+    <value>1024</value> </property> -->
+
+  <!-- JDBC driver in use -->
+  <property>
+    <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.store.databaseDriver
+    </name>
+    <value>org.hsqldb.jdbcDriver</value>
+  </property>
+
+  <!-- URL to connect to the DB master -->
+  <property>
+    <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.store.databaseUrl
+    </name>
+    <value>jdbc:hsqldb:mem:test</value>
+  </property>
+
+  <!-- # rows that the operator can retrieve in a window -->
+  <property>
+    <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.fetchSize
+    </name>
+    <value>50</value>
+  </property>
+
+  <!-- Query to fetch data -->
+  <property>
+    <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.query
+    </name>
+    <value>select * from test_event_table
+    </value>
+  </property>
+
+  <!-- Table name -->
+  <property>
+    <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.tableName
+    </name>
+    <value>test_event_table</value>
+  </property>
+
+  <!-- POJO class -->
+  <property>
+    <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.port.outputPort.attr.TUPLE_CLASS
+    </name>
+    <value>com.example.mydtapp.PojoEvent</value>
+  </property>
+
+  <!-- Output folder for HDFS output operator -->
+  <property>
+    <name>dt.application.SimpleJdbcToHDFSApp.operator.FileOutputOperator.filePath
+    </name>
+    <value>/tmp/jdbcApp</value>
+  </property>
+
+  <property>
+    <name>dt.application.SimpleJdbcToHDFSApp.operator.FileOutputOperator.rotationWindows
+    </name>
+    <value>5</value>
+  </property>
+
+</configuration>
+
diff --git a/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java b/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java
new file mode 100644
index 0000000..fb78944
--- /dev/null
+++ b/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.example.mydtapp;
+
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.<br>
+ * The assumption to run this test case is that test_event_table is created
+ * already
+ */
+public class ApplicationTest
+{
+
+  @Test
+  @Ignore
+  public void testApplication() throws IOException, Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SimpleJdbcToHDFSApp.xml"));
+      lma.prepareDAG(new JdbcHDFSApp(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.run(10000); // runs for 10 seconds and quits
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+}
diff --git a/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java
new file mode 100644
index 0000000..1d95f4d
--- /dev/null
+++ b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.example.mydtapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Application test for {@link JdbcHDFSApp}
+ */
+public class JdbcInputAppTest
+{
+  private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+  private static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+  private static final String TABLE_NAME = "test_event_table";
+  private static final String FILE_NAME = "/tmp/jdbcApp";
+
+  @BeforeClass
+  public static void setup()
+  {
+    try {
+      cleanup();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    try {
+      Class.forName(DB_DRIVER).newInstance();
+
+      Connection con = DriverManager.getConnection(URL);
+      Statement stmt = con.createStatement();
+
+      String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+          + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)";
+      stmt.executeUpdate(createTable);
+      cleanTable();
+      insertEventsInTable(10, 0);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @AfterClass
+  public static void cleanup()
+  {
+    try {
+      FileUtils.deleteDirectory(new File(FILE_NAME));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void cleanTable()
+  {
+    try {
+      Connection con = DriverManager.getConnection(URL);
+      Statement stmt = con.createStatement();
+      String cleanTable = "delete from " + TABLE_NAME;
+      stmt.executeUpdate(cleanTable);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void insertEventsInTable(int numEvents, int offset)
+  {
+    try {
+      Connection con = DriverManager.getConnection(URL);
+      String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
+      PreparedStatement stmt = con.prepareStatement(insert);
+      for (int i = 0; i < numEvents; i++, offset++) {
+        stmt.setInt(1, offset);
+        stmt.setString(2, "Account_Holder-" + offset);
+        stmt.setInt(3, (offset * 1000));
+        stmt.executeUpdate();
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SimpleJdbcToHDFSApp.xml"));
+      lma.prepareDAG(new JdbcHDFSApp(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync();
+
+      // wait for output files to roll      
+      Thread.sleep(5000);
+
+      String[] extensions = { "dat.0", "tmp" };
+      Collection<File> list = FileUtils.listFiles(new File(FILE_NAME), extensions, false);
+      Assert.assertEquals("Records in file", 10, FileUtils.readLines(list.iterator().next()).size());
+
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+}
diff --git a/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java
new file mode 100644
index 0000000..b96d4ae
--- /dev/null
+++ b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java
@@ -0,0 +1,128 @@
+package com.example.mydtapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+
+import javax.validation.ConstraintViolationException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datatorrent.api.LocalMode;
+
+public class JdbcPollerApplicationTest
+{
+  private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+  private static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+  private static final String TABLE_NAME = "test_event_table";
+  private static final String OUTPUT_DIR_NAME = "/tmp/test/output";
+
+  @BeforeClass
+  public static void setup()
+  {
+    try {
+      cleanup();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    try {
+      Class.forName(DB_DRIVER).newInstance();
+
+      Connection con = DriverManager.getConnection(URL);
+      Statement stmt = con.createStatement();
+
+      String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+          + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)";
+      stmt.executeUpdate(createTable);
+      cleanTable();
+      insertEventsInTable(10, 0);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @AfterClass
+  public static void cleanup()
+  {
+    try {
+      FileUtils.deleteDirectory(new File(OUTPUT_DIR_NAME));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void cleanTable()
+  {
+    try {
+      Connection con = DriverManager.getConnection(URL);
+      Statement stmt = con.createStatement();
+      String cleanTable = "delete from " + TABLE_NAME;
+      stmt.executeUpdate(cleanTable);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void insertEventsInTable(int numEvents, int offset)
+  {
+    try {
+      Connection con = DriverManager.getConnection(URL);
+      String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
+      PreparedStatement stmt = con.prepareStatement(insert);
+      for (int i = 0; i < numEvents; i++, offset++) {
+        stmt.setInt(1, offset);
+        stmt.setString(2, "Account_Holder-" + offset);
+        stmt.setInt(3, (offset * 1000));
+        stmt.executeUpdate();
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseUrl", URL);
+      conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseDriver", DB_DRIVER);
+      conf.setInt("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.partitionCount", 2);
+      conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.key", "ACCOUNT_NO");
+      conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.columnsExpression", "ACCOUNT_NO,NAME,AMOUNT");
+      conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.tableName", TABLE_NAME);
+      conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS",
+          "com.example.mydtapp.PojoEvent");
+      conf.set("dt.application.PollJdbcToHDFSApp.operator.Writer.filePath", OUTPUT_DIR_NAME);
+
+      lma.prepareDAG(new JdbcPollerApplication(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync();
+
+      // wait for output files to roll      
+      Thread.sleep(5000);
+
+      String[] extensions = { "dat.0", "tmp" };
+      Collection<File> list = FileUtils.listFiles(new File(OUTPUT_DIR_NAME), extensions, false);
+      int recordsCount = 0;
+      for (File file : list) {
+        recordsCount += FileUtils.readLines(file).size();
+      }
+      Assert.assertEquals("Records in file", 10, recordsCount);
+
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+}
diff --git a/examples/jdbcIngest/src/test/resources/example.sql b/examples/jdbcIngest/src/test/resources/example.sql
new file mode 100644
index 0000000..531c659
--- /dev/null
+++ b/examples/jdbcIngest/src/test/resources/example.sql
@@ -0,0 +1,24 @@
+DROP DATABASE IF EXISTS testDev;
+
+CREATE DATABASE testDev;
+
+USE testDev;
+
+CREATE TABLE IF NOT EXISTS `test_event_table` (
+  `ACCOUNT_NO` int(11) NOT NULL,
+  `NAME` varchar(255) DEFAULT NULL,
+  `AMOUNT` int(11) DEFAULT NULL,
+  primary key(`ACCOUNT_NO`)
+) ENGINE=MyISAM  DEFAULT CHARSET=latin1;
+
+INSERT INTO `test_event_table` (`ACCOUNT_NO`, `NAME`, `AMOUNT`) VALUES
+(1, 'User1', 1000),
+(2, 'User2', 2000),
+(3, 'User3', 3000),
+(4, 'User4', 4000),
+(5, 'User5', 5000),
+(6, 'User6', 6000),
+(7, 'User7', 7000),
+(8, 'User8', 8000),
+(9, 'User9', 9000),
+(10, 'User10', 1000);
diff --git a/examples/jdbcIngest/src/test/resources/log4j.properties b/examples/jdbcIngest/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3bfcdc5
--- /dev/null
+++ b/examples/jdbcIngest/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
diff --git a/examples/jdbcToJdbc/.gitignore b/examples/jdbcToJdbc/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/examples/jdbcToJdbc/.gitignore
@@ -0,0 +1 @@
+/target/
diff --git a/examples/jdbcToJdbc/README.md b/examples/jdbcToJdbc/README.md
new file mode 100644
index 0000000..562de69
--- /dev/null
+++ b/examples/jdbcToJdbc/README.md
@@ -0,0 +1,55 @@
+JdbcToJdbc App
+
+This application reads from a source table in MySQL, creates POJO's and writes the POJO's to another table in MySQL.
+
+Steps :
+
+Step 1 : Update the below properties in the properties file - src/site/conf/example.xml
+
+1.dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.databaseUrl
+- data base URL of the form jdbc:mysql://hostName:portNumber/dbName
+2.dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.userName
+- mysql user name
+3.dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.password
+- password
+4.dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.databaseUrl
+- data base URL of the form jdbc:mysql://hostName:portNumber/dbName
+5.dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.userName
+- mysql user name
+6.dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.password
+- password
+
+Step 2: Create database, table and add entries
+
+Go to mysql console and run the below command,
+mysql> source <path to > src/test/resources/example.sql
+
+After this is done, please verify that testDev.test_event_table is created and has 10 rows.It will also create an output table by the name testDev.test_output_event_table
+
+mysql> select count(*) from testDev.test_event_table;
++----------+
+| count(*) |
++----------+
+|       10 |
++----------+
+
+Step 3: Build the code,
+shell> mvn clean install 
+
+Upload the target/jdbcInput-1.0-SNAPSHOT.apa to the gateway
+
+Step 4 : During launch use "Specify custom properties" option and select example.xml
+
+Verification :
+
+Log on to the mysql console
+
+mysql> select count(*) from testDev.test_event_table;
++----------+
+| count(*) |
++----------+
+|       10 |
++----------+
+
+
+
diff --git a/examples/jdbcToJdbc/XmlJavadocCommentsExtractor.xsl b/examples/jdbcToJdbc/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..08075a9
--- /dev/null
+++ b/examples/jdbcToJdbc/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+            http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+
+<!--
+    Document   : XmlJavadocCommentsExtractor.xsl
+    Created on : September 16, 2014, 11:30 AM
+    Description:
+        The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
+-->
+
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
+  <xsl:output method="xml" standalone="yes"/>
+
+  <!-- copy xml by selecting only the following nodes, attributes and text -->
+  <xsl:template match="node()|text()|@*">
+    <xsl:copy>
+      <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+    </xsl:copy>
+  </xsl:template>
+
+  <!-- Strip off the following paths from the selected xml -->
+  <xsl:template match="//root/package/interface/interface
+                      |//root/package/interface/method/@qualified
+                      |//root/package/class/interface
+                      |//root/package/class/class
+                      |//root/package/class/method/@qualified
+                      |//root/package/class/field/@qualified" />
+
+  <xsl:strip-space elements="*"/>
+</xsl:stylesheet>
diff --git a/examples/jdbcToJdbc/pom.xml b/examples/jdbcToJdbc/pom.xml
new file mode 100644
index 0000000..8ed69d8
--- /dev/null
+++ b/examples/jdbcToJdbc/pom.xml
@@ -0,0 +1,319 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.example</groupId>
+  <version>1.0-SNAPSHOT</version>
+  <artifactId>jdbcToJdbc</artifactId>
+  <packaging>jar</packaging>
+
+  <!-- change these to the appropriate values -->
+  <name>JDBC Input Operator</name>
+  <description>Example Use of JDBC Input Operator</description>
+
+  <properties>
+    <!-- change this if you desire to use a different version of Apex Core -->
+    <apex.version>3.5.0</apex.version>
+    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+    <malhar.version>3.6.0</malhar.version>
+    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+    <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
+  </properties>
+  <repositories>
+    <repository>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+      <id>Datatorrent-Releases</id>
+      <name>DataTorrent Release Repository</name>
+      <url>https://www.datatorrent.com/maven/content/repositories/releases/</url>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-eclipse-plugin</artifactId>
+        <version>2.9</version>
+        <configuration>
+          <downloadSources>true</downloadSources>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.3</version>
+        <configuration>
+          <encoding>UTF-8</encoding>
+          <source>1.7</source>
+          <target>1.7</target>
+          <debug>true</debug>
+          <optimize>false</optimize>
+          <showDeprecation>true</showDeprecation>
+          <showWarnings>true</showWarnings>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.8</version>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>prepare-package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>target/deps</outputDirectory>
+              <includeScope>runtime</includeScope>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>app-package-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
+              <appendAssemblyId>false</appendAssemblyId>
+              <descriptors>
+                <descriptor>src/assemble/appPackage.xml</descriptor>
+              </descriptors>
+              <archiverConfig>
+                <defaultDirectoryMode>0755</defaultDirectoryMode>
+              </archiverConfig>
+              <archive>
+                <manifestEntries>
+                  <Class-Path>${apex.apppackage.classpath}</Class-Path>
+                  <DT-Engine-Version>${apex.version}</DT-Engine-Version>
+                  <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
+                  <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
+                  <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
+                  <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
+                  <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
+                </manifestEntries>
+              </archive>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <version>1.7</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <configuration>
+              <target>
+                <move
+                  file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
+                  tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
+              </target>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+          <execution>
+            <!-- create resource directory for xml javadoc -->
+            <id>createJavadocDirectory</id>
+            <phase>generate-resources</phase>
+            <configuration>
+              <tasks>
+                <delete
+                  dir="${project.build.directory}/generated-resources/xml-javadoc" />
+                <mkdir
+                  dir="${project.build.directory}/generated-resources/xml-javadoc" />
+              </tasks>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.9.1</version>
+        <executions>
+          <execution>
+            <id>attach-artifacts</id>
+            <phase>package</phase>
+            <goals>
+              <goal>attach-artifact</goal>
+            </goals>
+            <configuration>
+              <artifacts>
+                <artifact>
+                  <file>target/${project.artifactId}-${project.version}.apa</file>
+                  <type>apa</type>
+                </artifact>
+              </artifacts>
+              <skipAttach>false</skipAttach>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- generate javdoc -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <executions>
+          <!-- generate xml javadoc -->
+          <execution>
+            <id>xml-doclet</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>javadoc</goal>
+            </goals>
+            <configuration>
+              <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+              <additionalparam>-d
+                ${project.build.directory}/generated-resources/xml-javadoc
+                -filename
+                ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+              <useStandardDocletOptions>false</useStandardDocletOptions>
+              <docletArtifact>
+                <groupId>com.github.markusbernhardt</groupId>
+                <artifactId>xml-doclet</artifactId>
+                <version>1.0.4</version>
+              </docletArtifact>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- Transform xml javadoc to stripped down version containing only 
+        class/interface comments and tags -->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>xml-maven-plugin</artifactId>
+        <version>1.0</version>
+        <executions>
+          <execution>
+            <id>transform-xmljavadoc</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>transform</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <transformationSets>
+            <transformationSet>
+              <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
+              <includes>
+                <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+              </includes>
+              <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+              <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
+            </transformationSet>
+          </transformationSets>
+        </configuration>
+      </plugin>
+      <!-- copy xml javadoc to class jar -->
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <version>2.6</version>
+        <executions>
+          <execution>
+            <id>copy-resources</id>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/classes</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
+                  <includes>
+                    <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+                  </includes>
+                  <filtering>true</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+
+  </build>
+
+  <dependencies>
+    <!-- add your dependencies here -->
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${malhar.version}</version>
+      <!-- If you know that your application does not need transitive dependencies 
+        pulled in by malhar-library, uncomment the following to reduce the size of 
+        your app package. -->
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.10</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-engine</artifactId>
+      <version>${apex.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>5.1.36</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>janino</artifactId>
+      <version>2.7.8</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>commons-compiler</artifactId>
+      <version>2.7.8</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hsqldb</groupId>
+      <artifactId>hsqldb</artifactId>
+      <version>2.3.1</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/examples/jdbcToJdbc/src/assemble/appPackage.xml b/examples/jdbcToJdbc/src/assemble/appPackage.xml
new file mode 100644
index 0000000..7ad071c
--- /dev/null
+++ b/examples/jdbcToJdbc/src/assemble/appPackage.xml
@@ -0,0 +1,43 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/resources</directory>
+      <outputDirectory>/resources</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+
diff --git a/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/JdbcToJdbcApp.java b/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/JdbcToJdbcApp.java
new file mode 100644
index 0000000..6dffa87
--- /dev/null
+++ b/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/JdbcToJdbcApp.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.example.mydtapp;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+
+@ApplicationAnnotation(name = "JdbcToJdbcApp")
+public class JdbcToJdbcApp implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator());
+    JdbcStore store = new JdbcStore();
+    jdbcInputOperator.setStore(store);
+    jdbcInputOperator.setFieldInfos(addFieldInfos());
+
+    /**
+     * The class given below can be updated to the user defined class based on
+     * input table schema The addField infos method needs to be updated
+     * accordingly This line can be commented and class can be set from the
+     * properties file
+     */
+    //dag.setOutputPortAttribute(jdbcInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, PojoEvent.class);
+
+    JdbcPOJOInsertOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", new JdbcPOJOInsertOutputOperator());
+    JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+    jdbcOutputOperator.setStore(outputStore);
+    jdbcOutputOperator.setFieldInfos(addJdbcFieldInfos());
+
+    /**
+     * The class given below can be updated to the user defined class based on
+     * input table schema The addField infos method needs to be updated
+     * accordingly This line can be commented and class can be set from the
+     * properties file
+     */
+    //dag.setInputPortAttribute(jdbcOutputOperator.input, Context.PortContext.TUPLE_CLASS, PojoEvent.class);
+
+    dag.addStream("POJO's", jdbcInputOperator.outputPort, jdbcOutputOperator.input)
+        .setLocality(Locality.CONTAINER_LOCAL);
+  }
+
+  /**
+   * This method can be modified to have field mappings based on used defined
+   * class<br>
+   * User can choose to have a SQL support type as an additional paramter
+   */
+  private List<com.datatorrent.lib.db.jdbc.JdbcFieldInfo> addJdbcFieldInfos()
+  {
+    List<com.datatorrent.lib.db.jdbc.JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new com.datatorrent.lib.db.jdbc.JdbcFieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER,0));
+    fieldInfos.add(new com.datatorrent.lib.db.jdbc.JdbcFieldInfo("NAME", "name", SupportType.STRING,0));
+    fieldInfos.add(new com.datatorrent.lib.db.jdbc.JdbcFieldInfo("AMOUNT", "amount", SupportType.INTEGER,0));
+    return fieldInfos;
+  }
+  
+  /**
+   * This method can be modified to have field mappings based on used defined
+   * class
+   */
+  private List<FieldInfo> addFieldInfos()
+  {
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER));
+    fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING));
+    fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER));
+    return fieldInfos;
+  }
+
+}
diff --git a/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/PojoEvent.java b/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/PojoEvent.java
new file mode 100644
index 0000000..5154db3
--- /dev/null
+++ b/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/PojoEvent.java
@@ -0,0 +1,44 @@
+package com.example.mydtapp;
+
+public class PojoEvent
+{
+  @Override
+  public String toString()
+  {
+    return "TestPOJOEvent [accountNumber=" + accountNumber + ", name=" + name + ", amount=" + amount + "]";
+  }
+
+  private int accountNumber;
+  private String name;
+  private int amount;
+
+  public int getAccountNumber()
+  {
+    return accountNumber;
+  }
+
+  public void setAccountNumber(int accountNumber)
+  {
+    this.accountNumber = accountNumber;
+  }
+
+  public String getName()
+  {
+    return name;
+  }
+
+  public void setName(String name)
+  {
+    this.name = name;
+  }
+
+  public int getAmount()
+  {
+    return amount;
+  }
+
+  public void setAmount(int amount)
+  {
+    this.amount = amount;
+  }
+}
diff --git a/examples/jdbcToJdbc/src/main/resources/META-INF/properties.xml b/examples/jdbcToJdbc/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..904d297
--- /dev/null
+++ b/examples/jdbcToJdbc/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0"?>
+<configuration>
+  <!-- <property> <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name> 
+    <value>some-default-value (if value is not specified, it is required from 
+    the user or custom config when launching)</value> </property> -->
+  <!-- memory assigned to app master <property> <name>dt.attr.MASTER_MEMORY_MB</name> 
+    <value>1024</value> </property> -->
+
+  <!-- JDBC driver in use -->
+  <property>
+    <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.databaseDriver
+    </name>
+    <value>org.hsqldb.jdbcDriver</value>
+  </property>
+
+  <!-- URL to connect to the DB master -->
+  <property>
+    <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.databaseUrl
+    </name>
+    <value>jdbc:hsqldb:mem:test</value>
+  </property>
+
+  <!-- # rows that the operator can retrieve in a window -->
+  <property>
+    <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.fetchSize
+    </name>
+    <value>120</value>
+  </property>
+
+  <!-- POJO class -->
+  <property>
+    <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.port.outputPort.attr.TUPLE_CLASS
+    </name>
+    <value>com.example.mydtapp.PojoEvent</value>
+  </property>
+
+  <!-- Query to fetch data -->
+  <property>
+    <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.query
+    </name>
+    <value>select * from test_event_table
+    </value>
+  </property>
+
+  <!-- Input Table name -->
+  <property>
+    <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.tableName
+    </name>
+    <value>test_event_table</value>
+  </property>
+
+  <!-- JDBC driver in use -->
+  <property>
+    <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.databaseDriver
+    </name>
+    <value>org.hsqldb.jdbcDriver</value>
+  </property>
+
+  <!-- URL to connect to the DB master -->
+  <property>
+    <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.databaseUrl
+    </name>
+    <value>jdbc:hsqldb:mem:test</value>
+  </property>
+
+  <!-- # rows that the operator can retrieve in a window -->
+  <property>
+    <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.batchSize
+    </name>
+    <value>5</value>
+  </property>
+
+  <!-- Output Table name -->
+  <property>
+    <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.tablename
+    </name>
+    <value>test_output_event_table</value>
+  </property>
+  
+  <!-- POJO class -->
+  <property>
+    <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.port.input.attr.TUPLE_CLASS
+    </name>
+    <value>com.example.mydtapp.PojoEvent</value>
+  </property>
+
+</configuration>
+
diff --git a/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/ApplicationTest.java b/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/ApplicationTest.java
new file mode 100644
index 0000000..ea4c345
--- /dev/null
+++ b/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/ApplicationTest.java
@@ -0,0 +1,42 @@
+/**
+ * Put your copyright and license info here.
+ */
+package com.example.mydtapp;
+
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.<br>
+ * The assumption to run this test case is that test_event_table,meta-table and
+ * test_output_event_table are created already
+ */
+public class ApplicationTest
+{
+
+  @Test
+  @Ignore
+  public void testApplication() throws IOException, Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+      lma.prepareDAG(new JdbcToJdbcApp(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.run(50000); // runs for 10 seconds and quits
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+}
diff --git a/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/JdbcOperatorTest.java b/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/JdbcOperatorTest.java
new file mode 100644
index 0000000..f4709ba
--- /dev/null
+++ b/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/JdbcOperatorTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.example.mydtapp;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.db.jdbc.AbstractJdbcInputOperator;
+import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+
+/**
+ * Tests for {@link AbstractJdbcTransactionableOutputOperator} and
+ * {@link AbstractJdbcInputOperator}
+ */
+public class JdbcOperatorTest
+{
+  public static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+  public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+
+  private static final String TABLE_NAME = "test_event_table";
+  private static final String OUTPUT_TABLE_NAME = "test_output_event_table";
+
+  @BeforeClass
+  public static void setup()
+  {
+
+    try {
+      Class.forName(DB_DRIVER).newInstance();
+
+      Connection con = DriverManager.getConnection(URL);
+      Statement stmt = con.createStatement();
+
+      String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+          + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+          + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + "UNIQUE ("
+          + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", "
+          + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + ")";
+      
+      System.out.println(createMetaTable);
+      stmt.executeUpdate(createMetaTable);
+
+      String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+          + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)";
+      stmt.executeUpdate(createTable);
+      insertEventsInTable(10, 0);
+
+      String createOutputTable = "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE_NAME
+          + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)";
+      stmt.executeUpdate(createOutputTable);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void cleanTable()
+  {
+    try {
+      Connection con = DriverManager.getConnection(URL);
+      Statement stmt = con.createStatement();
+      String cleanTable = "delete from " + TABLE_NAME;
+      stmt.executeUpdate(cleanTable);
+      String cleanOutputTable = "delete from " + OUTPUT_TABLE_NAME;
+      stmt.executeUpdate(cleanOutputTable);
+
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void insertEventsInTable(int numEvents, int offset)
+  {
+    try {
+      Connection con = DriverManager.getConnection(URL);
+      String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
+      PreparedStatement stmt = con.prepareStatement(insert);
+      for (int i = 0; i < numEvents; i++, offset++) {
+        stmt.setInt(1, offset);
+        stmt.setString(2, "Account_Holder-" + offset);
+        stmt.setInt(3, (offset * 1000));
+        stmt.executeUpdate();
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public int getNumOfEventsInStore()
+  {
+    Connection con;
+    try {
+      con = DriverManager.getConnection(URL);
+      Statement stmt = con.createStatement();
+
+      String countQuery = "SELECT count(*) from " + OUTPUT_TABLE_NAME;
+      ResultSet resultSet = stmt.executeQuery(countQuery);
+      resultSet.next();
+      return resultSet.getInt(1);
+    } catch (SQLException e) {
+      throw new RuntimeException("fetching count", e);
+    }
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+      lma.prepareDAG(new JdbcToJdbcApp(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync();
+
+      // wait for records to be added to table    
+      Thread.sleep(5000);
+
+      Assert.assertEquals("Events in store", 10, getNumOfEventsInStore());
+      cleanTable();
+
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+}
diff --git a/examples/jdbcToJdbc/src/test/resources/example.sql b/examples/jdbcToJdbc/src/test/resources/example.sql
new file mode 100644
index 0000000..104240c
--- /dev/null
+++ b/examples/jdbcToJdbc/src/test/resources/example.sql
@@ -0,0 +1,36 @@
+DROP DATABASE IF EXISTS testDev;
+
+CREATE DATABASE testDev;
+
+USE testDev;
+
+CREATE TABLE IF NOT EXISTS `test_event_table` (
+  `ACCOUNT_NO` int(11) NOT NULL,
+  `NAME` varchar(255) DEFAULT NULL,
+  `AMOUNT` int(11) DEFAULT NULL
+) ENGINE=MyISAM  DEFAULT CHARSET=latin1;
+
+INSERT INTO `test_event_table` (`ACCOUNT_NO`, `NAME`, `AMOUNT`) VALUES
+(1, 'User1', 1000),
+(2, 'User2', 2000),
+(3, 'User3', 3000),
+(4, 'User4', 4000),
+(5, 'User5', 5000),
+(6, 'User6', 6000),
+(7, 'User7', 7000),
+(8, 'User8', 8000),
+(9, 'User9', 9000),
+(10, 'User10', 1000);
+
+CREATE TABLE IF NOT EXISTS `test_output_event_table` (
+  `ACCOUNT_NO` int(11) NOT NULL,
+  `NAME` varchar(255) DEFAULT NULL,
+  `AMOUNT` int(11) DEFAULT NULL
+) ENGINE=MyISAM  DEFAULT CHARSET=latin1;
+
+CREATE TABLE IF NOT EXISTS `dt_meta` ( 
+  `dt_app_id` VARCHAR(100) NOT NULL, 
+  `dt_operator_id` INT NOT NULL, 
+  `dt_window` BIGINT NOT NULL, 
+UNIQUE (`dt_app_id`, `dt_operator_id`, `dt_window`)
+) ENGINE=MyISAM  DEFAULT CHARSET=latin1;
diff --git a/examples/jdbcToJdbc/src/test/resources/log4j.properties b/examples/jdbcToJdbc/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3bfcdc5
--- /dev/null
+++ b/examples/jdbcToJdbc/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug