JdbcInput and HDFS output example app
SPOI-8251 jdbc to jdbc app
diff --git a/examples/jdbcIngest/.gitignore b/examples/jdbcIngest/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/examples/jdbcIngest/.gitignore
@@ -0,0 +1 @@
+/target/
diff --git a/examples/jdbcIngest/README.md b/examples/jdbcIngest/README.md
new file mode 100644
index 0000000..ec01985
--- /dev/null
+++ b/examples/jdbcIngest/README.md
@@ -0,0 +1,65 @@
+## Sample mysql implementation
+
+This project contains two applications to read records from a table in `MySQL`, create POJOs and write them to a file
+in the user specified directory in HDFS.
+
+1. SimpleJdbcToHDFSApp: Reads table records as per given query and emits them as POJOs.
+2. PollJdbcToHDFSApp: Reads table records using partitions in parallel fashion also polls for newly **appended** records and emits them as POJOs.
+
+Follow these steps to run these applications:
+
+**Step 1**: Update these properties in the file `src/main/resources/META_INF/properties-<applicationName>.xml`:
+
+| Property Name | Description |
+| ------------- | ----------- |
+| dt.application.<applicationName>.operator.JdbcInput.prop.store.databaseUrl | database URL of the form `jdbc:mysql://hostName:portNumber/dbName` |
+| dt.application.<applicationName>.operator.JdbcInput.prop.store.userName | MySQL user name |
+| dt.application.<applicationName>.operator.JdbcInput.prop.store.password | MySQL user password |
+| dt.application.<applicationName>.operator.FileOutputOperator.filePath | HDFS output directory path |
+
+**Step 2**: Create database table and add entries
+
+Go to the MySQL console and run (where _{path}_ is a suitable prefix):
+
+ mysql> source {path}/src/test/resources/example.sql
+
+After this, please verify that `testDev.test_event_table` is created and has 10 rows:
+
+ mysql> select count(*) from testDev.test_event_table;
+ +----------+
+ | count(*) |
+ +----------+
+ | 10 |
+ +----------+
+
+**Step 3**: Create HDFS output directory if not already present (_{path}_ should be the same as specified in `META_INF/properties-<applicationName>.xml`):
+
+ hadoop fs -mkdir -p {path}
+
+**Step 4**: Build the code:
+
+ shell> mvn clean install
+
+Upload the `target/jdbcInput-1.0-SNAPSHOT.apa` to the UI console if available or launch it from
+the commandline using `apexcli`.
+
+**Step 5**: During launch use `src/main/resources/META_INF/properties-<applicationName>.xml` as a custom configuration file; then verify
+that the output directory has the expected output:
+
+ shell> hadoop fs -cat <hadoop directory path>/2_op.dat.* | wc -l
+
+This should return 10 as the count.
+
+Sample Output:
+
+ hadoop fs -cat <path_to_file>/2_op.dat.0
+ PojoEvent [accountNumber=1, name=User1, amount=1000]
+ PojoEvent [accountNumber=2, name=User2, amount=2000]
+ PojoEvent [accountNumber=3, name=User3, amount=3000]
+ PojoEvent [accountNumber=4, name=User4, amount=4000]
+ PojoEvent [accountNumber=5, name=User5, amount=5000]
+ PojoEvent [accountNumber=6, name=User6, amount=6000]
+ PojoEvent [accountNumber=7, name=User7, amount=7000]
+ PojoEvent [accountNumber=8, name=User8, amount=8000]
+ PojoEvent [accountNumber=9, name=User9, amount=9000]
+ PojoEvent [accountNumber=10, name=User10, amount=1000]
diff --git a/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl b/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..08075a9
--- /dev/null
+++ b/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+
+<!--
+ Document : XmlJavadocCommentsExtractor.xsl
+ Created on : September 16, 2014, 11:30 AM
+ Description:
+ The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
+-->
+
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
+ <xsl:output method="xml" standalone="yes"/>
+
+ <!-- copy xml by selecting only the following nodes, attributes and text -->
+ <xsl:template match="node()|text()|@*">
+ <xsl:copy>
+ <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+ </xsl:copy>
+ </xsl:template>
+
+ <!-- Strip off the following paths from the selected xml -->
+ <xsl:template match="//root/package/interface/interface
+ |//root/package/interface/method/@qualified
+ |//root/package/class/interface
+ |//root/package/class/class
+ |//root/package/class/method/@qualified
+ |//root/package/class/field/@qualified" />
+
+ <xsl:strip-space elements="*"/>
+</xsl:stylesheet>
diff --git a/examples/jdbcIngest/pom.xml b/examples/jdbcIngest/pom.xml
new file mode 100644
index 0000000..f9288b8
--- /dev/null
+++ b/examples/jdbcIngest/pom.xml
@@ -0,0 +1,298 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.example</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <artifactId>jdbcInput</artifactId>
+ <packaging>jar</packaging>
+
+ <!-- change these to the appropriate values -->
+ <name>JDBC Input Operator</name>
+ <description>Example Uses of JDBC Input Operator</description>
+
+ <properties>
+ <!-- change this if you desire to use a different version of Apex Core -->
+ <apex.version>3.5.0</apex.version>
+ <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+ <malhar.version>3.6.0</malhar.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.9</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.3</version>
+ <configuration>
+ <encoding>UTF-8</encoding>
+ <source>1.7</source>
+ <target>1.7</target>
+ <debug>true</debug>
+ <optimize>false</optimize>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>target/deps</outputDirectory>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>app-package-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ <descriptors>
+ <descriptor>src/assemble/appPackage.xml</descriptor>
+ </descriptors>
+ <archiverConfig>
+ <defaultDirectoryMode>0755</defaultDirectoryMode>
+ </archiverConfig>
+ <archive>
+ <manifestEntries>
+ <Class-Path>${apex.apppackage.classpath}</Class-Path>
+ <DT-Engine-Version>${apex.version}</DT-Engine-Version>
+ <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
+ <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
+ <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
+ <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
+ <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <configuration>
+ <target>
+ <move
+ file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
+ tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ <execution>
+ <!-- create resource directory for xml javadoc -->
+ <id>createJavadocDirectory</id>
+ <phase>generate-resources</phase>
+ <configuration>
+ <tasks>
+ <delete
+ dir="${project.build.directory}/generated-resources/xml-javadoc" />
+ <mkdir
+ dir="${project.build.directory}/generated-resources/xml-javadoc" />
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>target/${project.artifactId}-${project.version}.apa</file>
+ <type>apa</type>
+ </artifact>
+ </artifacts>
+ <skipAttach>false</skipAttach>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- generate javdoc -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <executions>
+ <!-- generate xml javadoc -->
+ <execution>
+ <id>xml-doclet</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>javadoc</goal>
+ </goals>
+ <configuration>
+ <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+ <additionalparam>-d
+ ${project.build.directory}/generated-resources/xml-javadoc
+ -filename
+ ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+ <useStandardDocletOptions>false</useStandardDocletOptions>
+ <docletArtifact>
+ <groupId>com.github.markusbernhardt</groupId>
+ <artifactId>xml-doclet</artifactId>
+ <version>1.0.4</version>
+ </docletArtifact>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- Transform xml javadoc to stripped down version containing only
+ class/interface comments and tags -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>xml-maven-plugin</artifactId>
+ <version>1.0</version>
+ <executions>
+ <execution>
+ <id>transform-xmljavadoc</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>transform</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <transformationSets>
+ <transformationSet>
+ <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
+ <includes>
+ <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+ </includes>
+ <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+ <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
+ </transformationSet>
+ </transformationSets>
+ </configuration>
+ </plugin>
+ <!-- copy xml javadoc to class jar -->
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <id>copy-resources</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${basedir}/target/classes</outputDirectory>
+ <resources>
+ <resource>
+ <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
+ <includes>
+ <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+ </includes>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+
+ </build>
+
+ <dependencies>
+ <!-- add your dependencies here -->
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${malhar.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>apex-common</artifactId>
+ <version>${apex.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.10</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>apex-engine</artifactId>
+ <version>${apex.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>5.1.36</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jooq</groupId>
+ <artifactId>jooq</artifactId>
+ <version>3.6.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <version>2.7.8</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ <version>2.3.1</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git a/examples/jdbcIngest/src/assemble/appPackage.xml b/examples/jdbcIngest/src/assemble/appPackage.xml
new file mode 100644
index 0000000..7ad071c
--- /dev/null
+++ b/examples/jdbcIngest/src/assemble/appPackage.xml
@@ -0,0 +1,43 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/resources</directory>
+ <outputDirectory>/resources</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java
new file mode 100644
index 0000000..e155f23
--- /dev/null
+++ b/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.example.mydtapp;
+
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+
+public class FileLineOutputOperator extends AbstractFileOutputOperator<Object>
+{
+ @Override
+ protected String getFileName(Object input)
+ {
+ return context.getId() + "_" + "op.dat";
+ }
+
+ @Override
+ protected byte[] getBytesForTuple(Object input)
+ {
+ return (input.toString() + "\n").getBytes();
+ }
+}
diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java
new file mode 100644
index 0000000..5605bcf
--- /dev/null
+++ b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.example.mydtapp;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+
+@ApplicationAnnotation(name = "SimpleJdbcToHDFSApp")
+public class JdbcHDFSApp implements StreamingApplication
+{
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator());
+ /**
+ * The class given below can be updated to the user defined class based on
+ * input table schema The addField infos method needs to be updated
+ * accordingly This line can be commented and class can be set from the
+ * properties file
+ */
+ // dag.setOutputPortAttribute(jdbcInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, PojoEvent.class);
+
+ jdbcInputOperator.setFieldInfos(addFieldInfos());
+
+ JdbcStore store = new JdbcStore();
+ jdbcInputOperator.setStore(store);
+
+ FileLineOutputOperator fileOutput = dag.addOperator("FileOutputOperator", new FileLineOutputOperator());
+
+ dag.addStream("POJO's", jdbcInputOperator.outputPort, fileOutput.input).setLocality(Locality.CONTAINER_LOCAL);
+ }
+
+ /**
+ * This method can be modified to have field mappings based on used defined
+ * class
+ */
+ private List<FieldInfo> addFieldInfos()
+ {
+ List<FieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER));
+ fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING));
+ fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER));
+ return fieldInfos;
+ }
+
+}
diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java
new file mode 100644
index 0000000..54d71f7
--- /dev/null
+++ b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java
@@ -0,0 +1,48 @@
+package com.example.mydtapp;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOPollInputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+import com.google.common.collect.Lists;
+
+@ApplicationAnnotation(name = "PollJdbcToHDFSApp")
+public class JdbcPollerApplication implements StreamingApplication
+{
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ JdbcPOJOPollInputOperator poller = dag.addOperator("JdbcPoller", new JdbcPOJOPollInputOperator());
+
+ JdbcStore store = new JdbcStore();
+ poller.setStore(store);
+
+ poller.setFieldInfos(addFieldInfos());
+
+ FileLineOutputOperator writer = dag.addOperator("Writer", new FileLineOutputOperator());
+ dag.setInputPortAttribute(writer.input, PortContext.PARTITION_PARALLEL, true);
+ writer.setRotationWindows(60);
+
+ dag.addStream("dbrecords", poller.outputPort, writer.input);
+ }
+
+ /**
+ * This method can be modified to have field mappings based on used defined
+ * class
+ */
+ private List<FieldInfo> addFieldInfos()
+ {
+ List<FieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER));
+ fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING));
+ fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER));
+ return fieldInfos;
+ }
+}
diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java
new file mode 100644
index 0000000..f56522b
--- /dev/null
+++ b/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java
@@ -0,0 +1,44 @@
+package com.example.mydtapp;
+
+public class PojoEvent
+{
+ @Override
+ public String toString()
+ {
+ return "PojoEvent [accountNumber=" + accountNumber + ", name=" + name + ", amount=" + amount + "]";
+ }
+
+ private int accountNumber;
+ private String name;
+ private int amount;
+
+ public int getAccountNumber()
+ {
+ return accountNumber;
+ }
+
+ public void setAccountNumber(int accountNumber)
+ {
+ this.accountNumber = accountNumber;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public int getAmount()
+ {
+ return amount;
+ }
+
+ public void setAmount(int amount)
+ {
+ this.amount = amount;
+ }
+}
diff --git a/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml b/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml
new file mode 100644
index 0000000..6e7aaf6
--- /dev/null
+++ b/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0"?>
+<configuration>
+ <!-- Static partitioning, specify the partition count, this decides how
+ many ranges would be initiated -->
+ <property>
+ <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.partitionCount</name>
+ <value>2</value>
+ </property>
+
+ <property>
+ <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseDriver</name>
+ <value>com.mysql.jdbc.Driver</value>
+ </property>
+
+ <property>
+ <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseUrl</name>
+ <value>jdbc:mysql://localhost:3306/testDev</value>
+ </property>
+
+ <property>
+ <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.userName</name>
+ <value>root</value>
+ </property>
+
+ <property>
+ <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.password</name>
+ <value>mysql</value>
+ </property>
+
+ <!-- Batch size for poller -->
+ <property>
+ <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.batchSize</name>
+ <value>300</value>
+ </property>
+
+ <!-- look-up key for forming range queries, this would be the column name
+ on which the table is sorted -->
+ <property>
+ <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.key</name>
+ <value>ACCOUNT_NO</value>
+ </property>
+
+ <property>
+ <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.columnsExpression</name>
+ <value>ACCOUNT_NO,NAME,AMOUNT</value>
+ </property>
+ <property>
+ <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS</name>
+ <value>com.example.mydtapp.PojoEvent</value>
+ </property>
+
+ <!-- Table name -->
+ <property>
+ <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.tableName</name>
+ <value>test_event_table</value>
+ </property>
+
+ <property>
+ <name>dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.pollInterval</name>
+ <value>1000</value>
+ </property>
+
+ <!-- Output folder for HDFS output operator -->
+ <property>
+ <name>dt.application.PollJdbcToHDFSApp.operator.Writer.filePath</name>
+ <value>/tmp/test/output</value>
+ </property>
+
+ <property>
+ <name>dt.loggers.level</name>
+ <value>com.datatorrent.*:DEBUG,org.apache.*:INFO</value>
+ </property>
+</configuration>
diff --git a/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml b/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml
new file mode 100644
index 0000000..9fce7f8
--- /dev/null
+++ b/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0"?>
+<configuration>
+ <!-- <property> <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+ <value>some-default-value (if value is not specified, it is required from
+ the user or custom config when launching)</value> </property> -->
+ <!-- memory assigned to app master <property> <name>dt.attr.MASTER_MEMORY_MB</name>
+ <value>1024</value> </property> -->
+
+ <!-- JDBC driver in use -->
+ <property>
+ <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.store.databaseDriver
+ </name>
+ <value>org.hsqldb.jdbcDriver</value>
+ </property>
+
+ <!-- URL to connect to the DB master -->
+ <property>
+ <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.store.databaseUrl
+ </name>
+ <value>jdbc:hsqldb:mem:test</value>
+ </property>
+
+ <!-- # rows that the operator can retrieve in a window -->
+ <property>
+ <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.fetchSize
+ </name>
+ <value>50</value>
+ </property>
+
+ <!-- Query to fetch data -->
+ <property>
+ <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.query
+ </name>
+ <value>select * from test_event_table
+ </value>
+ </property>
+
+ <!-- Table name -->
+ <property>
+ <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.tableName
+ </name>
+ <value>test_event_table</value>
+ </property>
+
+ <!-- POJO class -->
+ <property>
+ <name>dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.port.outputPort.attr.TUPLE_CLASS
+ </name>
+ <value>com.example.mydtapp.PojoEvent</value>
+ </property>
+
+ <!-- Output folder for HDFS output operator -->
+ <property>
+ <name>dt.application.SimpleJdbcToHDFSApp.operator.FileOutputOperator.filePath
+ </name>
+ <value>/tmp/jdbcApp</value>
+ </property>
+
+ <property>
+ <name>dt.application.SimpleJdbcToHDFSApp.operator.FileOutputOperator.rotationWindows
+ </name>
+ <value>5</value>
+ </property>
+
+</configuration>
+
diff --git a/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java b/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java
new file mode 100644
index 0000000..fb78944
--- /dev/null
+++ b/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.example.mydtapp;
+
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.<br>
+ * The assumption to run this test case is that test_event_table is created
+ * already
+ */
+public class ApplicationTest
+{
+
+ @Test
+ @Ignore
+ public void testApplication() throws IOException, Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SimpleJdbcToHDFSApp.xml"));
+ lma.prepareDAG(new JdbcHDFSApp(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(10000); // runs for 10 seconds and quits
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+}
diff --git a/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java
new file mode 100644
index 0000000..1d95f4d
--- /dev/null
+++ b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.example.mydtapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Application test for {@link JdbcHDFSApp}
+ */
+public class JdbcInputAppTest
+{
+ private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+ private static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+ private static final String TABLE_NAME = "test_event_table";
+ private static final String FILE_NAME = "/tmp/jdbcApp";
+
+ @BeforeClass
+ public static void setup()
+ {
+ try {
+ cleanup();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+
+ String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)";
+ stmt.executeUpdate(createTable);
+ cleanTable();
+ insertEventsInTable(10, 0);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterClass
+ public static void cleanup()
+ {
+ try {
+ FileUtils.deleteDirectory(new File(FILE_NAME));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void cleanTable()
+ {
+ try {
+ Connection con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+ String cleanTable = "delete from " + TABLE_NAME;
+ stmt.executeUpdate(cleanTable);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void insertEventsInTable(int numEvents, int offset)
+ {
+ try {
+ Connection con = DriverManager.getConnection(URL);
+ String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
+ PreparedStatement stmt = con.prepareStatement(insert);
+ for (int i = 0; i < numEvents; i++, offset++) {
+ stmt.setInt(1, offset);
+ stmt.setString(2, "Account_Holder-" + offset);
+ stmt.setInt(3, (offset * 1000));
+ stmt.executeUpdate();
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SimpleJdbcToHDFSApp.xml"));
+ lma.prepareDAG(new JdbcHDFSApp(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ // wait for output files to roll
+ Thread.sleep(5000);
+
+ String[] extensions = { "dat.0", "tmp" };
+ Collection<File> list = FileUtils.listFiles(new File(FILE_NAME), extensions, false);
+ Assert.assertEquals("Records in file", 10, FileUtils.readLines(list.iterator().next()).size());
+
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+}
diff --git a/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java
new file mode 100644
index 0000000..b96d4ae
--- /dev/null
+++ b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java
@@ -0,0 +1,128 @@
+package com.example.mydtapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+
+import javax.validation.ConstraintViolationException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datatorrent.api.LocalMode;
+
+public class JdbcPollerApplicationTest
+{
+ private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+ private static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+ private static final String TABLE_NAME = "test_event_table";
+ private static final String OUTPUT_DIR_NAME = "/tmp/test/output";
+
+ @BeforeClass
+ public static void setup()
+ {
+ try {
+ cleanup();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+
+ String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)";
+ stmt.executeUpdate(createTable);
+ cleanTable();
+ insertEventsInTable(10, 0);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterClass
+ public static void cleanup()
+ {
+ try {
+ FileUtils.deleteDirectory(new File(OUTPUT_DIR_NAME));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void cleanTable()
+ {
+ try {
+ Connection con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+ String cleanTable = "delete from " + TABLE_NAME;
+ stmt.executeUpdate(cleanTable);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void insertEventsInTable(int numEvents, int offset)
+ {
+ try {
+ Connection con = DriverManager.getConnection(URL);
+ String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
+ PreparedStatement stmt = con.prepareStatement(insert);
+ for (int i = 0; i < numEvents; i++, offset++) {
+ stmt.setInt(1, offset);
+ stmt.setString(2, "Account_Holder-" + offset);
+ stmt.setInt(3, (offset * 1000));
+ stmt.executeUpdate();
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseUrl", URL);
+ conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseDriver", DB_DRIVER);
+ conf.setInt("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.partitionCount", 2);
+ conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.key", "ACCOUNT_NO");
+ conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.columnsExpression", "ACCOUNT_NO,NAME,AMOUNT");
+ conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.tableName", TABLE_NAME);
+ conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS",
+ "com.example.mydtapp.PojoEvent");
+ conf.set("dt.application.PollJdbcToHDFSApp.operator.Writer.filePath", OUTPUT_DIR_NAME);
+
+ lma.prepareDAG(new JdbcPollerApplication(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ // wait for output files to roll
+ Thread.sleep(5000);
+
+ String[] extensions = { "dat.0", "tmp" };
+ Collection<File> list = FileUtils.listFiles(new File(OUTPUT_DIR_NAME), extensions, false);
+ int recordsCount = 0;
+ for (File file : list) {
+ recordsCount += FileUtils.readLines(file).size();
+ }
+ Assert.assertEquals("Records in file", 10, recordsCount);
+
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+}
diff --git a/examples/jdbcIngest/src/test/resources/example.sql b/examples/jdbcIngest/src/test/resources/example.sql
new file mode 100644
index 0000000..531c659
--- /dev/null
+++ b/examples/jdbcIngest/src/test/resources/example.sql
@@ -0,0 +1,24 @@
+DROP DATABASE IF EXISTS testDev;
+
+CREATE DATABASE testDev;
+
+USE testDev;
+
+CREATE TABLE IF NOT EXISTS `test_event_table` (
+ `ACCOUNT_NO` int(11) NOT NULL,
+ `NAME` varchar(255) DEFAULT NULL,
+ `AMOUNT` int(11) DEFAULT NULL,
+ primary key(`ACCOUNT_NO`)
+) ENGINE=MyISAM DEFAULT CHARSET=latin1;
+
+INSERT INTO `test_event_table` (`ACCOUNT_NO`, `NAME`, `AMOUNT`) VALUES
+(1, 'User1', 1000),
+(2, 'User2', 2000),
+(3, 'User3', 3000),
+(4, 'User4', 4000),
+(5, 'User5', 5000),
+(6, 'User6', 6000),
+(7, 'User7', 7000),
+(8, 'User8', 8000),
+(9, 'User9', 9000),
+(10, 'User10', 1000);
diff --git a/examples/jdbcIngest/src/test/resources/log4j.properties b/examples/jdbcIngest/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3bfcdc5
--- /dev/null
+++ b/examples/jdbcIngest/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
diff --git a/examples/jdbcToJdbc/.gitignore b/examples/jdbcToJdbc/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/examples/jdbcToJdbc/.gitignore
@@ -0,0 +1 @@
+/target/
diff --git a/examples/jdbcToJdbc/README.md b/examples/jdbcToJdbc/README.md
new file mode 100644
index 0000000..562de69
--- /dev/null
+++ b/examples/jdbcToJdbc/README.md
@@ -0,0 +1,55 @@
+JdbcToJdbc App
+
+This application reads from a source table in MySQL, creates POJO's and writes the POJO's to another table in MySQL.
+
+Steps :
+
+Step 1 : Update the below properties in the properties file - src/site/conf/example.xml
+
+1.dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.databaseUrl
+- data base URL of the form jdbc:mysql://hostName:portNumber/dbName
+2.dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.userName
+- mysql user name
+3.dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.password
+- password
+4.dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.databaseUrl
+- data base URL of the form jdbc:mysql://hostName:portNumber/dbName
+5.dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.userName
+- mysql user name
+6.dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.password
+- password
+
+Step 2: Create database, table and add entries
+
+Go to mysql console and run the below command,
+mysql> source <path to > src/test/resources/example.sql
+
+After this is done, please verify that testDev.test_event_table is created and has 10 rows.It will also create an output table by the name testDev.test_output_event_table
+
+mysql> select count(*) from testDev.test_event_table;
++----------+
+| count(*) |
++----------+
+| 10 |
++----------+
+
+Step 3: Build the code,
+shell> mvn clean install
+
+Upload the target/jdbcInput-1.0-SNAPSHOT.apa to the gateway
+
+Step 4 : During launch use "Specify custom properties" option and select example.xml
+
+Verification :
+
+Log on to the mysql console
+
+mysql> select count(*) from testDev.test_event_table;
++----------+
+| count(*) |
++----------+
+| 10 |
++----------+
+
+
+
diff --git a/examples/jdbcToJdbc/XmlJavadocCommentsExtractor.xsl b/examples/jdbcToJdbc/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..08075a9
--- /dev/null
+++ b/examples/jdbcToJdbc/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+
+<!--
+ Document : XmlJavadocCommentsExtractor.xsl
+ Created on : September 16, 2014, 11:30 AM
+ Description:
+ The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet.
+-->
+
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
+ <xsl:output method="xml" standalone="yes"/>
+
+ <!-- copy xml by selecting only the following nodes, attributes and text -->
+ <xsl:template match="node()|text()|@*">
+ <xsl:copy>
+ <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+ </xsl:copy>
+ </xsl:template>
+
+ <!-- Strip off the following paths from the selected xml -->
+ <xsl:template match="//root/package/interface/interface
+ |//root/package/interface/method/@qualified
+ |//root/package/class/interface
+ |//root/package/class/class
+ |//root/package/class/method/@qualified
+ |//root/package/class/field/@qualified" />
+
+ <xsl:strip-space elements="*"/>
+</xsl:stylesheet>
diff --git a/examples/jdbcToJdbc/pom.xml b/examples/jdbcToJdbc/pom.xml
new file mode 100644
index 0000000..8ed69d8
--- /dev/null
+++ b/examples/jdbcToJdbc/pom.xml
@@ -0,0 +1,319 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.example</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <artifactId>jdbcToJdbc</artifactId>
+ <packaging>jar</packaging>
+
+ <!-- change these to the appropriate values -->
+ <name>JDBC Input Operator</name>
+ <description>Example Use of JDBC Input Operator</description>
+
+ <properties>
+ <!-- change this if you desire to use a different version of Apex Core -->
+ <apex.version>3.5.0</apex.version>
+ <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+ <malhar.version>3.6.0</malhar.version>
+ <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+ <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
+ </properties>
+ <repositories>
+ <repository>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ <id>Datatorrent-Releases</id>
+ <name>DataTorrent Release Repository</name>
+ <url>https://www.datatorrent.com/maven/content/repositories/releases/</url>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.9</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.3</version>
+ <configuration>
+ <encoding>UTF-8</encoding>
+ <source>1.7</source>
+ <target>1.7</target>
+ <debug>true</debug>
+ <optimize>false</optimize>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>target/deps</outputDirectory>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>app-package-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ <descriptors>
+ <descriptor>src/assemble/appPackage.xml</descriptor>
+ </descriptors>
+ <archiverConfig>
+ <defaultDirectoryMode>0755</defaultDirectoryMode>
+ </archiverConfig>
+ <archive>
+ <manifestEntries>
+ <Class-Path>${apex.apppackage.classpath}</Class-Path>
+ <DT-Engine-Version>${apex.version}</DT-Engine-Version>
+ <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
+ <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
+ <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
+ <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
+ <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <configuration>
+ <target>
+ <move
+ file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
+ tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ <execution>
+ <!-- create resource directory for xml javadoc -->
+ <id>createJavadocDirectory</id>
+ <phase>generate-resources</phase>
+ <configuration>
+ <tasks>
+ <delete
+ dir="${project.build.directory}/generated-resources/xml-javadoc" />
+ <mkdir
+ dir="${project.build.directory}/generated-resources/xml-javadoc" />
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>target/${project.artifactId}-${project.version}.apa</file>
+ <type>apa</type>
+ </artifact>
+ </artifacts>
+ <skipAttach>false</skipAttach>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- generate javdoc -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <executions>
+ <!-- generate xml javadoc -->
+ <execution>
+ <id>xml-doclet</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>javadoc</goal>
+ </goals>
+ <configuration>
+ <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+ <additionalparam>-d
+ ${project.build.directory}/generated-resources/xml-javadoc
+ -filename
+ ${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+ <useStandardDocletOptions>false</useStandardDocletOptions>
+ <docletArtifact>
+ <groupId>com.github.markusbernhardt</groupId>
+ <artifactId>xml-doclet</artifactId>
+ <version>1.0.4</version>
+ </docletArtifact>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- Transform xml javadoc to stripped down version containing only
+ class/interface comments and tags -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>xml-maven-plugin</artifactId>
+ <version>1.0</version>
+ <executions>
+ <execution>
+ <id>transform-xmljavadoc</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>transform</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <transformationSets>
+ <transformationSet>
+ <dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
+ <includes>
+ <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+ </includes>
+ <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+ <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
+ </transformationSet>
+ </transformationSets>
+ </configuration>
+ </plugin>
+ <!-- copy xml javadoc to class jar -->
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <id>copy-resources</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${basedir}/target/classes</outputDirectory>
+ <resources>
+ <resource>
+ <directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
+ <includes>
+ <include>${project.artifactId}-${project.version}-javadoc.xml</include>
+ </includes>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+
+ </build>
+
+ <dependencies>
+ <!-- add your dependencies here -->
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${malhar.version}</version>
+ <!-- If you know that your application does not need transitive dependencies
+ pulled in by malhar-library, uncomment the following to reduce the size of
+ your app package. -->
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>apex-common</artifactId>
+ <version>${apex.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.10</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>apex-engine</artifactId>
+ <version>${apex.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>5.1.36</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <version>2.7.8</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>commons-compiler</artifactId>
+ <version>2.7.8</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ <version>2.3.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/examples/jdbcToJdbc/src/assemble/appPackage.xml b/examples/jdbcToJdbc/src/assemble/appPackage.xml
new file mode 100644
index 0000000..7ad071c
--- /dev/null
+++ b/examples/jdbcToJdbc/src/assemble/appPackage.xml
@@ -0,0 +1,43 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/resources</directory>
+ <outputDirectory>/resources</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
diff --git a/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/JdbcToJdbcApp.java b/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/JdbcToJdbcApp.java
new file mode 100644
index 0000000..6dffa87
--- /dev/null
+++ b/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/JdbcToJdbcApp.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.example.mydtapp;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.FieldInfo.SupportType;
+
+@ApplicationAnnotation(name = "JdbcToJdbcApp")
+public class JdbcToJdbcApp implements StreamingApplication
+{
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator());
+ JdbcStore store = new JdbcStore();
+ jdbcInputOperator.setStore(store);
+ jdbcInputOperator.setFieldInfos(addFieldInfos());
+
+ /**
+ * The class given below can be updated to the user defined class based on
+ * input table schema The addField infos method needs to be updated
+ * accordingly This line can be commented and class can be set from the
+ * properties file
+ */
+ //dag.setOutputPortAttribute(jdbcInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, PojoEvent.class);
+
+ JdbcPOJOInsertOutputOperator jdbcOutputOperator = dag.addOperator("JdbcOutput", new JdbcPOJOInsertOutputOperator());
+ JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+ jdbcOutputOperator.setStore(outputStore);
+ jdbcOutputOperator.setFieldInfos(addJdbcFieldInfos());
+
+ /**
+ * The class given below can be updated to the user defined class based on
+ * input table schema The addField infos method needs to be updated
+ * accordingly This line can be commented and class can be set from the
+ * properties file
+ */
+ //dag.setInputPortAttribute(jdbcOutputOperator.input, Context.PortContext.TUPLE_CLASS, PojoEvent.class);
+
+ dag.addStream("POJO's", jdbcInputOperator.outputPort, jdbcOutputOperator.input)
+ .setLocality(Locality.CONTAINER_LOCAL);
+ }
+
+ /**
+ * This method can be modified to have field mappings based on used defined
+ * class<br>
+ * User can choose to have a SQL support type as an additional paramter
+ */
+ private List<com.datatorrent.lib.db.jdbc.JdbcFieldInfo> addJdbcFieldInfos()
+ {
+ List<com.datatorrent.lib.db.jdbc.JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new com.datatorrent.lib.db.jdbc.JdbcFieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER,0));
+ fieldInfos.add(new com.datatorrent.lib.db.jdbc.JdbcFieldInfo("NAME", "name", SupportType.STRING,0));
+ fieldInfos.add(new com.datatorrent.lib.db.jdbc.JdbcFieldInfo("AMOUNT", "amount", SupportType.INTEGER,0));
+ return fieldInfos;
+ }
+
+ /**
+ * This method can be modified to have field mappings based on used defined
+ * class
+ */
+ private List<FieldInfo> addFieldInfos()
+ {
+ List<FieldInfo> fieldInfos = Lists.newArrayList();
+ fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER));
+ fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING));
+ fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER));
+ return fieldInfos;
+ }
+
+}
diff --git a/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/PojoEvent.java b/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/PojoEvent.java
new file mode 100644
index 0000000..5154db3
--- /dev/null
+++ b/examples/jdbcToJdbc/src/main/java/com/example/mydtapp/PojoEvent.java
@@ -0,0 +1,44 @@
+package com.example.mydtapp;
+
+public class PojoEvent
+{
+ @Override
+ public String toString()
+ {
+ return "TestPOJOEvent [accountNumber=" + accountNumber + ", name=" + name + ", amount=" + amount + "]";
+ }
+
+ private int accountNumber;
+ private String name;
+ private int amount;
+
+ public int getAccountNumber()
+ {
+ return accountNumber;
+ }
+
+ public void setAccountNumber(int accountNumber)
+ {
+ this.accountNumber = accountNumber;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public int getAmount()
+ {
+ return amount;
+ }
+
+ public void setAmount(int amount)
+ {
+ this.amount = amount;
+ }
+}
diff --git a/examples/jdbcToJdbc/src/main/resources/META-INF/properties.xml b/examples/jdbcToJdbc/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..904d297
--- /dev/null
+++ b/examples/jdbcToJdbc/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0"?>
+<configuration>
+ <!-- <property> <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+ <value>some-default-value (if value is not specified, it is required from
+ the user or custom config when launching)</value> </property> -->
+ <!-- memory assigned to app master <property> <name>dt.attr.MASTER_MEMORY_MB</name>
+ <value>1024</value> </property> -->
+
+ <!-- JDBC driver in use -->
+ <property>
+ <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.databaseDriver
+ </name>
+ <value>org.hsqldb.jdbcDriver</value>
+ </property>
+
+ <!-- URL to connect to the DB master -->
+ <property>
+ <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.store.databaseUrl
+ </name>
+ <value>jdbc:hsqldb:mem:test</value>
+ </property>
+
+ <!-- # rows that the operator can retrieve in a window -->
+ <property>
+ <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.fetchSize
+ </name>
+ <value>120</value>
+ </property>
+
+ <!-- POJO class -->
+ <property>
+ <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.port.outputPort.attr.TUPLE_CLASS
+ </name>
+ <value>com.example.mydtapp.PojoEvent</value>
+ </property>
+
+ <!-- Query to fetch data -->
+ <property>
+ <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.query
+ </name>
+ <value>select * from test_event_table
+ </value>
+ </property>
+
+ <!-- Input Table name -->
+ <property>
+ <name>dt.application.JdbcToJdbcApp.operator.JdbcInput.prop.tableName
+ </name>
+ <value>test_event_table</value>
+ </property>
+
+ <!-- JDBC driver in use -->
+ <property>
+ <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.databaseDriver
+ </name>
+ <value>org.hsqldb.jdbcDriver</value>
+ </property>
+
+ <!-- URL to connect to the DB master -->
+ <property>
+ <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.store.databaseUrl
+ </name>
+ <value>jdbc:hsqldb:mem:test</value>
+ </property>
+
+ <!-- # rows that the operator can retrieve in a window -->
+ <property>
+ <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.batchSize
+ </name>
+ <value>5</value>
+ </property>
+
+ <!-- Output Table name -->
+ <property>
+ <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.prop.tablename
+ </name>
+ <value>test_output_event_table</value>
+ </property>
+
+ <!-- POJO class -->
+ <property>
+ <name>dt.application.JdbcToJdbcApp.operator.JdbcOutput.port.input.attr.TUPLE_CLASS
+ </name>
+ <value>com.example.mydtapp.PojoEvent</value>
+ </property>
+
+</configuration>
+
diff --git a/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/ApplicationTest.java b/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/ApplicationTest.java
new file mode 100644
index 0000000..ea4c345
--- /dev/null
+++ b/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/ApplicationTest.java
@@ -0,0 +1,42 @@
+/**
+ * Put your copyright and license info here.
+ */
+package com.example.mydtapp;
+
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.<br>
+ * The assumption to run this test case is that test_event_table,meta-table and
+ * test_output_event_table are created already
+ */
+public class ApplicationTest
+{
+
+ @Test
+ @Ignore
+ public void testApplication() throws IOException, Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ lma.prepareDAG(new JdbcToJdbcApp(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(50000); // runs for 10 seconds and quits
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+}
diff --git a/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/JdbcOperatorTest.java b/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/JdbcOperatorTest.java
new file mode 100644
index 0000000..f4709ba
--- /dev/null
+++ b/examples/jdbcToJdbc/src/test/java/com/example/mydtapp/JdbcOperatorTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.example.mydtapp;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.db.jdbc.AbstractJdbcInputOperator;
+import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+
+/**
+ * Tests for {@link AbstractJdbcTransactionableOutputOperator} and
+ * {@link AbstractJdbcInputOperator}
+ */
+public class JdbcOperatorTest
+{
+ public static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+ public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+
+ private static final String TABLE_NAME = "test_event_table";
+ private static final String OUTPUT_TABLE_NAME = "test_output_event_table";
+
+ @BeforeClass
+ public static void setup()
+ {
+
+ try {
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+
+ String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+ + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + "UNIQUE ("
+ + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", "
+ + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + ")";
+
+ System.out.println(createMetaTable);
+ stmt.executeUpdate(createMetaTable);
+
+ String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)";
+ stmt.executeUpdate(createTable);
+ insertEventsInTable(10, 0);
+
+ String createOutputTable = "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE_NAME
+ + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)";
+ stmt.executeUpdate(createOutputTable);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void cleanTable()
+ {
+ try {
+ Connection con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+ String cleanTable = "delete from " + TABLE_NAME;
+ stmt.executeUpdate(cleanTable);
+ String cleanOutputTable = "delete from " + OUTPUT_TABLE_NAME;
+ stmt.executeUpdate(cleanOutputTable);
+
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void insertEventsInTable(int numEvents, int offset)
+ {
+ try {
+ Connection con = DriverManager.getConnection(URL);
+ String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
+ PreparedStatement stmt = con.prepareStatement(insert);
+ for (int i = 0; i < numEvents; i++, offset++) {
+ stmt.setInt(1, offset);
+ stmt.setString(2, "Account_Holder-" + offset);
+ stmt.setInt(3, (offset * 1000));
+ stmt.executeUpdate();
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int getNumOfEventsInStore()
+ {
+ Connection con;
+ try {
+ con = DriverManager.getConnection(URL);
+ Statement stmt = con.createStatement();
+
+ String countQuery = "SELECT count(*) from " + OUTPUT_TABLE_NAME;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ resultSet.next();
+ return resultSet.getInt(1);
+ } catch (SQLException e) {
+ throw new RuntimeException("fetching count", e);
+ }
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ lma.prepareDAG(new JdbcToJdbcApp(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ // wait for records to be added to table
+ Thread.sleep(5000);
+
+ Assert.assertEquals("Events in store", 10, getNumOfEventsInStore());
+ cleanTable();
+
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+}
diff --git a/examples/jdbcToJdbc/src/test/resources/example.sql b/examples/jdbcToJdbc/src/test/resources/example.sql
new file mode 100644
index 0000000..104240c
--- /dev/null
+++ b/examples/jdbcToJdbc/src/test/resources/example.sql
@@ -0,0 +1,36 @@
+DROP DATABASE IF EXISTS testDev;
+
+CREATE DATABASE testDev;
+
+USE testDev;
+
+CREATE TABLE IF NOT EXISTS `test_event_table` (
+ `ACCOUNT_NO` int(11) NOT NULL,
+ `NAME` varchar(255) DEFAULT NULL,
+ `AMOUNT` int(11) DEFAULT NULL
+) ENGINE=MyISAM DEFAULT CHARSET=latin1;
+
+INSERT INTO `test_event_table` (`ACCOUNT_NO`, `NAME`, `AMOUNT`) VALUES
+(1, 'User1', 1000),
+(2, 'User2', 2000),
+(3, 'User3', 3000),
+(4, 'User4', 4000),
+(5, 'User5', 5000),
+(6, 'User6', 6000),
+(7, 'User7', 7000),
+(8, 'User8', 8000),
+(9, 'User9', 9000),
+(10, 'User10', 1000);
+
+CREATE TABLE IF NOT EXISTS `test_output_event_table` (
+ `ACCOUNT_NO` int(11) NOT NULL,
+ `NAME` varchar(255) DEFAULT NULL,
+ `AMOUNT` int(11) DEFAULT NULL
+) ENGINE=MyISAM DEFAULT CHARSET=latin1;
+
+CREATE TABLE IF NOT EXISTS `dt_meta` (
+ `dt_app_id` VARCHAR(100) NOT NULL,
+ `dt_operator_id` INT NOT NULL,
+ `dt_window` BIGINT NOT NULL,
+UNIQUE (`dt_app_id`, `dt_operator_id`, `dt_window`)
+) ENGINE=MyISAM DEFAULT CHARSET=latin1;
diff --git a/examples/jdbcToJdbc/src/test/resources/log4j.properties b/examples/jdbcToJdbc/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3bfcdc5
--- /dev/null
+++ b/examples/jdbcToJdbc/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug