blob: 574534fb64cf8e99f1d09f46d4c1061287f32f1d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.examples.JdbcIngest;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import javax.validation.ConstraintViolationException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.LocalMode;
/**
* Application test for {@link JdbcHDFSApp}
*/
public class JdbcInputAppTest
{
private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
private static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
private static final String TABLE_NAME = "test_event_table";
private static final String FILE_NAME = "/tmp/jdbcApp";
@BeforeClass
public static void setup()
{
try {
cleanup();
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
Class.forName(DB_DRIVER).newInstance();
Connection con = DriverManager.getConnection(URL);
Statement stmt = con.createStatement();
String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)";
stmt.executeUpdate(createTable);
cleanTable();
insertEventsInTable(10, 0);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@AfterClass
public static void cleanup()
{
try {
FileUtils.deleteDirectory(new File(FILE_NAME));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void cleanTable()
{
try {
Connection con = DriverManager.getConnection(URL);
Statement stmt = con.createStatement();
String cleanTable = "delete from " + TABLE_NAME;
stmt.executeUpdate(cleanTable);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public static void insertEventsInTable(int numEvents, int offset)
{
try {
Connection con = DriverManager.getConnection(URL);
String insert = "insert into " + TABLE_NAME + " values (?,?,?)";
PreparedStatement stmt = con.prepareStatement(insert);
for (int i = 0; i < numEvents; i++, offset++) {
stmt.setInt(1, offset);
stmt.setString(2, "Account_Holder-" + offset);
stmt.setInt(3, (offset * 1000));
stmt.executeUpdate();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Test
public void testApplication() throws Exception
{
try {
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SimpleJdbcToHDFSApp.xml"));
lma.prepareDAG(new JdbcHDFSApp(), conf);
LocalMode.Controller lc = lma.getController();
lc.runAsync();
// wait for output files to roll
Thread.sleep(5000);
String[] extensions = { "dat.0", "tmp" };
Collection<File> list = FileUtils.listFiles(new File(FILE_NAME), extensions, false);
Assert.assertEquals("Records in file", 10, FileUtils.readLines(list.iterator().next()).size());
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
}