| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.pig.piggybank.test.storage; |
| |
| import java.io.File; |
| import java.io.FileWriter; |
| import java.io.FilenameFilter; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.sql.Date; |
| import java.text.DateFormat; |
| import java.text.ParseException; |
| import java.text.SimpleDateFormat; |
| |
| import org.apache.pig.ExecType; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.backend.executionengine.ExecJob; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; |
| import org.apache.pig.test.MiniGenericCluster; |
| import org.apache.pig.test.Util; |
| import org.hsqldb.Server; |
| import org.junit.After; |
| import org.junit.Before; |
| |
| import org.junit.Test; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotSame; |
| import static org.junit.Assert.assertTrue; |
| |
| public class TestDBStorage { |
| |
| private PigServer pigServer; |
| private MiniGenericCluster cluster; |
| private Server dbServer; |
| private String driver = "org.hsqldb.jdbcDriver"; |
| // private String url = "jdbc:hsqldb:mem:."; |
| private String TMP_DIR; |
| private String dbUrl = "jdbc:hsqldb:hsql://localhost/batchtest"; |
| private String user = "sa"; |
| private String password = ""; |
| |
| private static final String INPUT_FILE = "datafile.txt"; |
| |
| public TestDBStorage() throws IOException { |
| // Initialise Pig server |
| cluster = MiniGenericCluster.buildCluster(); |
| pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); |
| pigServer.getPigContext().getProperties() |
| .setProperty(MRConfiguration.MAP_MAX_ATTEMPTS, "1"); |
| pigServer.getPigContext().getProperties() |
| .setProperty(MRConfiguration.REDUCE_MAX_ATTEMPTS, "1"); |
| System.out.println("Pig server initialized successfully"); |
| TMP_DIR = System.getProperty("user.dir") + "/build/test/"; |
| // Initialise DBServer |
| dbServer = new Server(); |
| dbServer.setDatabaseName(0, "batchtest"); |
| // dbServer.setDatabasePath(0, "mem:test;sql.enforce_strict_size=true"); |
| dbServer.setDatabasePath(0, |
| "file:" + TMP_DIR + "batchtest;"+ |
| "hsqldb.default_table_type=cached;hsqldb.cache_rows=100;sql.enforce_strict_size=true"); |
| dbServer.setLogWriter(null); |
| dbServer.setErrWriter(null); |
| dbServer.start(); |
| System.out.println("Database URL: " + dbUrl); |
| try { |
| Class.forName(driver); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| System.out.println(this + ".setUp() error: " + e.getMessage()); |
| } |
| System.out.println("Database server started on port: " + dbServer.getPort()); |
| } |
| |
| private void createFile() throws IOException { |
| PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); |
| w.println("100\tapple\t1.0\t2008-01-01"); |
| w.println("100\torange\t2.0\t2008-02-01"); |
| w.println("100\tbanana\t1.1\t2008-03-01"); |
| w.println("\t\t\t"); |
| w.close(); |
| Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); |
| } |
| |
| private void createTable() throws IOException { |
| Connection con = null; |
| String sql = "create table ttt (id integer, name varchar(32), ratio double, dt date)"; |
| try { |
| con = DriverManager.getConnection(dbUrl, user, password); |
| } catch (SQLException sqe) { |
| throw new IOException("Unable to obtain a connection to the database", |
| sqe); |
| } |
| try { |
| Statement st = con.createStatement(); |
| st.executeUpdate(sql); |
| st.close(); |
| con.commit(); |
| con.close(); |
| } catch (SQLException sqe) { |
| throw new IOException("Cannot create table", sqe); |
| } |
| } |
| |
| @Before |
| public void setUp() throws IOException { |
| createFile(); |
| createTable(); |
| } |
| |
| @After |
| public void tearDown() throws IOException { |
| new File(INPUT_FILE).delete(); |
| Util.deleteFile(cluster, INPUT_FILE); |
| pigServer.shutdown(); |
| dbServer.stop(); |
| cluster.shutDown(); |
| |
| File[] dbFiles = new File(TMP_DIR).listFiles(new FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| if (name.startsWith("batchtest")) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| }); |
| if (dbFiles != null) { |
| for (File file : dbFiles) { |
| file.delete(); |
| } |
| } |
| } |
| |
| @Test |
| public void testWriteToDB() throws IOException, InterruptedException, ParseException { |
| String insertQuery = "insert into ttt (id, name, ratio, dt) values (?,?,?,?)"; |
| pigServer.setBatchOn(); |
| String dbStore = "org.apache.pig.piggybank.storage.DBStorage('" + driver |
| + "', '" + Util.encodeEscape(dbUrl) + "', '" + insertQuery + "');"; |
| pigServer.registerQuery("A = LOAD '" + INPUT_FILE |
| + "' as (id:int, fruit:chararray, ratio:double, dt : datetime);"); |
| pigServer.registerQuery("STORE A INTO 'dummy' USING " + dbStore); |
| ExecJob job = pigServer.executeBatch().get(0); |
| try { |
| while(!job.hasCompleted()) Thread.sleep(1000); |
| } catch(InterruptedException ie) {// ignore |
| } |
| |
| assertNotSame("Failed: " + job.getException(), job.getStatus(), |
| ExecJob.JOB_STATUS.FAILED); |
| |
| Connection con = null; |
| String selectQuery = "select id, name, ratio, dt from ttt order by name"; |
| try { |
| con = DriverManager.getConnection(dbUrl, user, password); |
| } catch (SQLException sqe) { |
| throw new IOException( |
| "Unable to obtain database connection for data verification", sqe); |
| } |
| try { |
| PreparedStatement ps = con.prepareStatement(selectQuery); |
| ResultSet rs = ps.executeQuery(); |
| |
| int expId = 100; |
| String[] expNames = {"apple", "banana", "orange"}; |
| double[] expRatios = {1.0, 1.1, 2.0}; |
| DateFormat df = new SimpleDateFormat("yyyy-MM-dd"); |
| Date[] expDates = {new Date(df.parse("2008-01-01").getTime()), |
| new Date(df.parse("2008-03-01").getTime()), |
| new Date(df.parse("2008-02-01").getTime())}; |
| for (int i = 0; i < 4; i++) { |
| rs.next(); |
| //Need to check for nulls explicitly. |
| if ( i == 0) { |
| //Id |
| rs.getInt(1); |
| assertTrue(rs.wasNull()); |
| //Name |
| rs.getString(2); |
| assertTrue(rs.wasNull()); |
| //Ratio |
| rs.getDouble(3); |
| assertTrue(rs.wasNull()); |
| //Date |
| rs.getDate(4); |
| assertTrue(rs.wasNull()); |
| } else { |
| assertEquals("Id mismatch", expId, rs.getInt(1)); |
| assertEquals("Name mismatch", expNames[i-1], rs.getString(2)); |
| assertEquals("Ratio mismatch", expRatios[i-1], rs.getDouble(3), 0.0001); |
| assertEquals("Date mismatch", expDates[i-1], rs.getDate(4)); |
| } |
| } |
| |
| } catch (SQLException sqe) { |
| throw new IOException( |
| "Unable to read data from database for verification", sqe); |
| } |
| } |
| } |