blob: 37e0b52a2932c9d9665ca3d0e546d0c6174e3c23 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.vertica;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.conf.Configuration;
public class VerticaUtil {
private static final Log LOG = LogFactory.getLog(VerticaUtil.class);
public static int verticaVersion(Configuration conf, boolean output) throws IOException {
int ver = -1;
try {
VerticaConfiguration vtconfig = new VerticaConfiguration(conf);
Connection conn = vtconfig.getConnection(output);
DatabaseMetaData dbmd = conn.getMetaData();
ver = dbmd.getDatabaseMajorVersion() * 100;
ver += dbmd.getDatabaseMinorVersion();
} catch(ClassNotFoundException e) {
throw new IOException("Vertica Driver required to use Vertica Input or Output Formatters");
} catch (SQLException e) { throw new IOException(e); }
return ver;
}
public static void checkOutputSpecs(Configuration conf) throws IOException {
VerticaConfiguration vtconfig = new VerticaConfiguration(conf);
String writerTable = vtconfig.getOutputTableName();
if (writerTable == null)
throw new IOException("Vertica output requires a table name defined by "
+ VerticaConfiguration.OUTPUT_TABLE_NAME_PROP);
String[] def = vtconfig.getOutputTableDef();
boolean dropTable = vtconfig.getDropTable();
String schema = null;
String table = null;
String[] schemaTable = writerTable.split("\\.");
if (schemaTable.length == 2) {
schema = schemaTable[0];
table = schemaTable[1];
} else
table = schemaTable[0];
Statement stmt = null;
try {
Connection conn = vtconfig.getConnection(true);
DatabaseMetaData dbmd = conn.getMetaData();
ResultSet rs = dbmd.getTables(null, schema, table, null);
boolean tableExists = rs.next();
stmt = conn.createStatement();
if (tableExists && dropTable) {
if(verticaVersion(conf, true) >= 305) {
stmt = conn.createStatement();
stmt.execute("TRUNCATE TABLE " + writerTable);
} else {
// for version < 3.0 drop the table if it exists
// if def is empty, grab the columns first to redfine the table
if (def == null) {
rs = dbmd.getColumns(null, schema, table, null);
ArrayList<String> defs = new ArrayList<String>();
while (rs.next())
defs.add(rs.getString(4) + " " + rs.getString(5));
def = defs.toArray(new String[0]);
}
stmt = conn.createStatement();
stmt.execute("DROP TABLE " + writerTable + " CASCADE");
tableExists = false; // force create
}
}
// create table if it doesn't exist
if (!tableExists) {
if (def == null)
throw new RuntimeException("Table " + writerTable
+ " does not exist and no table definition provided");
if (schema != null) {
rs = dbmd.getSchemas(null, schema);
if (!rs.next())
stmt.execute("CREATE SCHEMA " + schema);
}
StringBuffer tabledef = new StringBuffer("CREATE TABLE ").append(
writerTable).append(" (");
for (String column : def)
tabledef.append(column).append(",");
tabledef.replace(tabledef.length() - 1, tabledef.length(), ")");
stmt.execute(tabledef.toString());
// TODO: create segmented projections
stmt.execute("select implement_temp_design('" + writerTable + "')");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (stmt != null)
try {
stmt.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
// TODO: catch when params required but missing
// TODO: better error message when count query is bad
public static List<InputSplit> getSplits(JobContext context)
throws IOException {
Configuration conf = context.getConfiguration();
int numSplits = conf.getInt("mapreduce.job.maps", 1);
LOG.debug("creating splits up to " + numSplits);
List<InputSplit> splits = new ArrayList<InputSplit>();
int i = 0;
long start = 0;
long end = 0;
boolean limitOffset = true;
// This is the fancy part of mapping inputs...here's how we figure out
// splits
// get the params query or the params
VerticaConfiguration config = new VerticaConfiguration(conf);
String inputQuery = config.getInputQuery();
if (inputQuery == null)
throw new IOException("Vertica input requires query defined by "
+ VerticaConfiguration.QUERY_PROP);
String paramsQuery = config.getParamsQuery();
Collection<List<Object>> params = config.getInputParameters();
// TODO: limit needs order by unique key
// TODO: what if there are more parameters than numsplits?
// prep a count(*) wrapper query and then populate the bind params for each
String countQuery = "SELECT COUNT(*) FROM (\n" + inputQuery + "\n) count";
if (paramsQuery != null) {
LOG.debug("creating splits using paramsQuery :" + paramsQuery);
Connection conn = null;
Statement stmt = null;
try {
conn = config.getConnection(false);
stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(paramsQuery);
ResultSetMetaData rsmd = rs.getMetaData();
while (rs.next()) {
limitOffset = false;
List<Object> segmentParams = new ArrayList<Object>();
for (int j = 1; j <= rsmd.getColumnCount(); j++) {
segmentParams.add(rs.getObject(j));
}
splits.add(new VerticaInputSplit(inputQuery, segmentParams, start,
end));
}
} catch (Exception e) {
throw new IOException(e);
} finally {
try {
if (stmt != null)
stmt.close();
} catch (SQLException e) {
throw new IOException(e);
}
}
} else if (params != null && params.size() > 0) {
LOG.debug("creating splits using " + params.size() + " params");
limitOffset = false;
for (List<Object> segmentParams : params) {
// if there are more numSplits than params we're going to introduce some
// limit and offsets
// TODO: write code to generate the start/end pairs for each group
splits
.add(new VerticaInputSplit(inputQuery, segmentParams, start, end));
}
}
if (limitOffset) {
LOG.debug("creating splits using limit and offset");
Connection conn = null;
Statement stmt = null;
long count = 0;
try {
conn = config.getConnection(false);
stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(countQuery);
rs.next();
count = rs.getLong(1);
} catch (Exception e) {
throw new IOException(e);
} finally {
try {
if (stmt != null)
stmt.close();
} catch (SQLException e) {
throw new IOException(e);
}
}
long splitSize = count / numSplits;
end = splitSize;
LOG.debug("creating " + numSplits + " splits for " + count + " records");
for (i = 0; i < numSplits; i++) {
splits.add(new VerticaInputSplit(inputQuery, null, start, end));
start += splitSize;
end += splitSize;
}
}
LOG.debug("returning " + splits.size() + " final splits");
return splits;
}
}