blob: ce857a04deac2b8ba0245568cbb4b893f80d48f5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.eventdb.db;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import org.apache.ambari.eventdb.model.DataTable;
import org.apache.ambari.eventdb.model.DataTable.AvgData;
import org.apache.ambari.eventdb.model.DataTable.Summary;
import org.apache.ambari.eventdb.model.DataTable.Summary.SummaryFields;
import org.apache.ambari.eventdb.model.DataTable.Times;
import org.apache.ambari.eventdb.model.Jobs.JobDBEntry;
import org.apache.ambari.eventdb.model.Jobs.JobDBEntry.JobFields;
import org.apache.ambari.eventdb.model.TaskAttempt;
import org.apache.ambari.eventdb.model.TaskAttempt.TaskAttemptFields;
import org.apache.ambari.eventdb.model.WorkflowContext;
import org.apache.ambari.eventdb.model.Workflows;
import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry;
import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry.WorkflowFields;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
public class PostgresConnector implements DBConnector {
private static Log LOG = LogFactory.getLog(PostgresConnector.class);
private static final String WORKFLOW_TABLE_NAME = "workflow";
private static final String JOB_TABLE_NAME = "job";
private static final String TASK_ATTEMPT_TABLE_NAME = "taskattempt";
public static final String SORT_ASC = "ASC";
public static final String SORT_DESC = "DESC";
private static final ObjectMapper jsonMapper = new ObjectMapper();
private Connection db;
public static enum Statements {
SJ_INSERT_JOB_PS(""),
SJ_CHECK_WORKFLOW_PS(""),
SJ_INSERT_WORKFLOW_PS(""),
UJ_UPDATE_JOB_PS(""),
UJ_UPDATE_WORKFLOW_PS(""),
FW_PS("SELECT " + WorkflowDBEntry.WORKFLOW_FIELDS + " FROM " + WORKFLOW_TABLE_NAME),
FW_COUNT_PS("SELECT count(*) as " + SummaryFields.numRows + " FROM " + WORKFLOW_TABLE_NAME),
FW_SUMMARY_PS("SELECT count(*) as " + SummaryFields.numRows + ", "
+ getAvg(WorkflowFields.NUMJOBSTOTAL, SummaryFields.avgJobs, SummaryFields.minJobs, SummaryFields.maxJobs) + ", "
+ getAvg(WorkflowFields.INPUTBYTES, SummaryFields.avgInput, SummaryFields.minInput, SummaryFields.maxInput) + ", "
+ getAvg(WorkflowFields.OUTPUTBYTES, SummaryFields.avgOutput, SummaryFields.minOutput, SummaryFields.maxOutput) + ", "
+ getAvg(WorkflowFields.DURATION, SummaryFields.avgDuration, SummaryFields.minDuration, SummaryFields.maxDuration) + ", min("
+ WorkflowFields.STARTTIME + ") as " + SummaryFields.youngest + ", max(" + WorkflowFields.STARTTIME + ") as " + SummaryFields.oldest + " FROM "
+ WORKFLOW_TABLE_NAME),
FJD_PS("SELECT " + JobDBEntry.JOB_FIELDS + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.WORKFLOWID.toString() + " = ?"),
FJSS_PS("SELECT " + JobFields.SUBMITTIME + ", " + JobFields.FINISHTIME + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.JOBID + " = ?"),
FTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.JOBID + " = ? AND "
+ TaskAttemptFields.TASKTYPE + " = ? ORDER BY " + TaskAttemptFields.STARTTIME);
private String statementString;
Statements(String statementString) {
this.statementString = statementString;
}
public String getStatementString() {
return statementString;
}
private static String getAvg(WorkflowFields field, SummaryFields avg, SummaryFields min, SummaryFields max) {
return "avg(" + field + ") as " + avg + ", min(" + field + ") as " + min + ", max(" + field + ") as " + max;
}
}
private Map<Statements,PreparedStatement> preparedStatements = new EnumMap<Statements,PreparedStatement>(Statements.class);
public PostgresConnector(String hostname, String dbname, String username, String password) throws IOException {
String url = "jdbc:postgresql://" + hostname + "/" + dbname;
try {
Class.forName("org.postgresql.Driver");
db = DriverManager.getConnection(url, username, password);
} catch (ClassNotFoundException e) {
db = null;
throw new IOException(e);
} catch (SQLException e) {
db = null;
throw new IOException(e);
}
}
@Override
public void submitJob(JobDBEntry j, WorkflowContext context) throws IOException {
// PreparedStatement insertJobPS = getPS(Statements.SJ_INSERT_JOB_PS);
// PreparedStatement checkWorkflowPS = getPS(Statements.SJ_CHECK_WORKFLOW_PS);
// PreparedStatement insertWorkflowPS = getPS(Statements.SJ_INSERT_WORKFLOW_PS);
throw new NotImplementedException();
}
@Override
public void updateJob(JobDBEntry j) throws IOException {
// PreparedStatement updateJobPS = getPS(Statements.UJ_UPDATE_JOB_PS);
// PreparedStatement updateWorkflowPS = getPS(Statements.UJ_UPDATE_WORKFLOW_PS);
throw new NotImplementedException();
}
@Override
public Workflows fetchWorkflows() throws IOException {
Workflows workflows = new Workflows();
workflows.setWorkflows(fetchWorkflows(getPS(Statements.FW_PS)));
workflows.setSummary(fetchSummary(getPS(Statements.FW_SUMMARY_PS)));
return workflows;
}
@Override
public Workflows fetchWorkflows(WorkflowFields field, boolean sortAscending, int offset, int limit) throws IOException {
if (offset < 0)
offset = 0;
Workflows workflows = new Workflows();
workflows.setWorkflows(fetchWorkflows(getQualifiedPS(Statements.FW_PS, "", field, sortAscending, offset, limit)));
workflows.setSummary(fetchSummary(getPS(Statements.FW_SUMMARY_PS)));
return workflows;
}
private List<WorkflowDBEntry> fetchWorkflows(PreparedStatement ps) throws IOException {
List<WorkflowDBEntry> workflows = new ArrayList<WorkflowDBEntry>();
ResultSet rs = null;
try {
rs = ps.executeQuery();
while (rs.next()) {
workflows.add(getWorkflowDBEntry(rs));
}
} catch (SQLException e) {
throw new IOException(e);
} finally {
try {
if (rs != null)
rs.close();
} catch (SQLException e) {
LOG.error("Exception while closing ResultSet", e);
}
}
return workflows;
}
private Summary fetchSummary(PreparedStatement ps) throws IOException {
Summary summary = new Summary();
ResultSet rs = null;
try {
rs = ps.executeQuery();
if (rs.next()) {
summary.setNumRows(SummaryFields.numRows.getInt(rs));
summary.setJobs(getAvgData(rs, SummaryFields.avgJobs, SummaryFields.minJobs, SummaryFields.maxJobs));
summary.setInput(getAvgData(rs, SummaryFields.avgInput, SummaryFields.minInput, SummaryFields.maxInput));
summary.setOutput(getAvgData(rs, SummaryFields.avgOutput, SummaryFields.minOutput, SummaryFields.maxOutput));
summary.setDuration(getAvgData(rs, SummaryFields.avgDuration, SummaryFields.minDuration, SummaryFields.maxDuration));
Times times = new Times();
times.setYoungest(SummaryFields.youngest.getLong(rs));
times.setOldest(SummaryFields.oldest.getLong(rs));
summary.setTimes(times);
}
} catch (SQLException e) {
throw new IOException(e);
} finally {
try {
if (rs != null)
rs.close();
} catch (SQLException e) {
LOG.error("Exception while closing ResultSet", e);
}
}
return summary;
}
private static WorkflowDBEntry getWorkflowDBEntry(ResultSet rs) throws SQLException, JsonParseException, JsonMappingException, IOException {
WorkflowDBEntry w = new WorkflowDBEntry();
w.setWorkflowId(WorkflowFields.WORKFLOWID.getString(rs));
w.setWorkflowName(WorkflowFields.WORKFLOWNAME.getString(rs));
w.setUserName(WorkflowFields.USERNAME.getString(rs));
w.setStartTime(WorkflowFields.STARTTIME.getLong(rs));
w.setElapsedTime(WorkflowFields.DURATION.getLong(rs));
w.setNumJobsTotal(WorkflowFields.NUMJOBSTOTAL.getInt(rs));
w.setInputBytes(WorkflowFields.INPUTBYTES.getLong(rs));
w.setOutputBytes(WorkflowFields.OUTPUTBYTES.getLong(rs));
w.setNumJobsCompleted(WorkflowFields.NUMJOBSCOMPLETED.getInt(rs));
w.setWorkflowContext(jsonMapper.readValue(WorkflowFields.WORKFLOWCONTEXT.getString(rs), WorkflowContext.class));
return w;
}
private static AvgData getAvgData(ResultSet rs, SummaryFields avg, SummaryFields min, SummaryFields max) throws SQLException {
AvgData avgData = new AvgData();
avgData.setAvg(avg.getDouble(rs));
avgData.setMin(min.getLong(rs));
avgData.setMax(max.getLong(rs));
return avgData;
}
@Override
public DataTable fetchWorkflows(int offset, int limit, String searchTerm, int echo, WorkflowFields col, boolean sortAscending, String searchWorkflowId,
String searchWorkflowName, String searchWorkflowType, String searchUserName, int minJobs, int maxJobs, long minInputBytes, long maxInputBytes,
long minOutputBytes, long maxOutputBytes, long minDuration, long maxDuration, long minStartTime, long maxStartTime) throws IOException {
int total = 0;
PreparedStatement ps = getPS(Statements.FW_COUNT_PS);
ResultSet rs = null;
try {
rs = ps.executeQuery();
if (rs.next())
total = SummaryFields.numRows.getInt(rs);
} catch (SQLException e) {
throw new IOException(e);
} finally {
try {
if (rs != null)
rs.close();
} catch (SQLException e) {
LOG.error("Exception while closing ResultSet", e);
}
}
String searchClause = buildSearchClause(searchTerm, searchWorkflowId, searchWorkflowName, searchWorkflowType, searchUserName, minJobs, maxJobs,
minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, minDuration, maxDuration, minStartTime, maxStartTime);
List<WorkflowDBEntry> workflows = fetchWorkflows(getQualifiedPS(Statements.FW_PS, searchClause, col, sortAscending, offset, limit));
Summary summary = fetchSummary(getQualifiedPS(Statements.FW_SUMMARY_PS, searchClause));
DataTable table = new DataTable();
table.setiTotalRecords(total);
table.setiTotalDisplayRecords(summary.getNumRows());
table.setAaData(workflows);
table.setsEcho(echo);
table.setSummary(summary);
return table;
}
@Override
public List<JobDBEntry> fetchJobDetails(String workflowId) throws IOException {
PreparedStatement ps = getPS(Statements.FJD_PS);
List<JobDBEntry> jobs = new ArrayList<JobDBEntry>();
ResultSet rs = null;
try {
ps.setString(1, workflowId);
rs = ps.executeQuery();
while (rs.next()) {
JobDBEntry j = new JobDBEntry();
j.setConfPath(JobFields.CONFPATH.getString(rs));
j.setSubmitTime(JobFields.SUBMITTIME.getLong(rs));
long finishTime = JobFields.FINISHTIME.getLong(rs);
if (finishTime > j.getSubmitTime())
j.setElapsedTime(finishTime - j.getSubmitTime());
else
j.setElapsedTime(0);
j.setInputBytes(JobFields.INPUTBYTES.getLong(rs));
j.setJobId(JobFields.JOBID.getString(rs));
j.setJobName(JobFields.JOBNAME.getString(rs));
j.setMaps(JobFields.MAPS.getInt(rs));
j.setOutputBytes(JobFields.OUTPUTBYTES.getLong(rs));
j.setReduces(JobFields.REDUCES.getInt(rs));
j.setStatus(JobFields.STATUS.getString(rs));
j.setUserName(JobFields.USERNAME.getString(rs));
j.setWorkflowEntityName(JobFields.WORKFLOWENTITYNAME.getString(rs));
j.setWorkflowId(JobFields.WORKFLOWID.getString(rs));
jobs.add(j);
}
rs.close();
} catch (SQLException e) {
throw new IOException(e);
} finally {
if (rs != null)
try {
rs.close();
} catch (SQLException e) {
LOG.error("Exception while closing ResultSet", e);
}
}
return jobs;
}
@Override
public long[] fetchJobStartStopTimes(String jobID) throws IOException {
PreparedStatement ps = getPS(Statements.FJSS_PS);
long[] times = new long[2];
ResultSet rs = null;
try {
ps.setString(1, jobID);
rs = ps.executeQuery();
if (!rs.next())
return null;
times[0] = JobFields.SUBMITTIME.getLong(rs);
times[1] = JobFields.FINISHTIME.getLong(rs);
rs.close();
} catch (SQLException e) {
throw new IOException(e);
} finally {
if (rs != null)
try {
rs.close();
} catch (SQLException e) {
LOG.error("Exception while closing ResultSet", e);
}
}
if (times[1] == 0)
times[1] = System.currentTimeMillis();
if (times[1] < times[0])
times[1] = times[0];
return times;
}
@Override
public List<TaskAttempt> fetchTaskAttempts(String jobID, String taskType) throws IOException {
PreparedStatement ps = getPS(Statements.FTA_PS);
List<TaskAttempt> taskAttempts = new ArrayList<TaskAttempt>();
ResultSet rs = null;
try {
ps.setString(1, jobID);
ps.setString(2, taskType);
rs = ps.executeQuery();
while (rs.next()) {
TaskAttempt t = new TaskAttempt();
t.setFinishTime(TaskAttemptFields.FINISHTIME.getLong(rs));
t.setInputBytes(TaskAttemptFields.INPUTBYTES.getLong(rs));
t.setLocality(TaskAttemptFields.LOCALITY.getString(rs));
t.setMapFinishTime(TaskAttemptFields.MAPFINISHTIME.getLong(rs));
t.setOutputBytes(TaskAttemptFields.OUTPUTBYTES.getLong(rs));
t.setShuffleFinishTime(TaskAttemptFields.SHUFFLEFINISHTIME.getLong(rs));
t.setSortFinishTime(TaskAttemptFields.SORTFINISHTIME.getLong(rs));
t.setStartTime(TaskAttemptFields.STARTTIME.getLong(rs));
t.setStatus(TaskAttemptFields.STATUS.getString(rs));
t.setTaskAttemptId(TaskAttemptFields.TASKATTEMPTID.getString(rs));
t.setTaskType(TaskAttemptFields.TASKTYPE.getString(rs));
taskAttempts.add(t);
}
rs.close();
} catch (SQLException e) {
throw new IOException(e);
} finally {
if (rs != null)
try {
rs.close();
} catch (SQLException e) {
LOG.error("Exception while closing ResultSet", e);
}
}
return taskAttempts;
}
private PreparedStatement getPS(Statements statement) throws IOException {
if (db == null)
throw new IOException("postgres db not initialized");
synchronized (preparedStatements) {
if (!preparedStatements.containsKey(statement)) {
try {
preparedStatements.put(statement, db.prepareStatement(statement.getStatementString()));
} catch (SQLException e) {
throw new IOException(e);
}
}
}
return preparedStatements.get(statement);
}
private PreparedStatement getQualifiedPS(Statements statement, String searchClause) throws IOException {
if (db == null)
throw new IOException("postgres db not initialized");
try {
// LOG.debug("preparing " + statement.getStatementString() + searchClause);
return db.prepareStatement(statement.getStatementString() + searchClause);
} catch (SQLException e) {
throw new IOException(e);
}
}
private PreparedStatement getQualifiedPS(Statements statement, String searchClause, WorkflowFields field, boolean sortAscending, int offset, int limit)
throws IOException {
if (db == null)
throw new IOException("postgres db not initialized");
String limitClause = " ORDER BY " + field.toString() + " " + (sortAscending ? SORT_ASC : SORT_DESC) + " OFFSET " + offset
+ (limit >= 0 ? " LIMIT " + limit : "");
return getQualifiedPS(statement, searchClause + limitClause);
}
private static void addRangeSearch(StringBuilder sb, WorkflowFields field, int min, int max) {
if (min >= 0)
append(sb, greaterThan(field, Integer.toString(min)));
if (max >= 0)
append(sb, lessThan(field, Integer.toString(max)));
}
private static void addRangeSearch(StringBuilder sb, WorkflowFields field, long min, long max) {
if (min >= 0)
append(sb, greaterThan(field, Long.toString(min)));
if (max >= 0)
append(sb, lessThan(field, Long.toString(max)));
}
private static void append(StringBuilder sb, String s) {
if (sb.length() > WHERE.length())
sb.append(" and ");
sb.append(s);
}
private static String like(WorkflowFields field, String s) {
return field.toString() + " like '%" + s + "%'";
}
private static String startsWith(WorkflowFields field, String s) {
return field.toString() + " like '" + s + "%'";
}
private static String equals(WorkflowFields field, String s) {
return field.toString() + " = '" + s + "'";
}
private static String lessThan(WorkflowFields field, String s) {
return field.toString() + " <= " + s;
}
private static String greaterThan(WorkflowFields field, String s) {
return field.toString() + " >= " + s;
}
private static final String WHERE = " where";
private static String buildSearchClause(String searchTerm, String searchWorkflowId, String searchWorkflowName, String searchWorkflowType,
String searchUserName, int minJobs, int maxJobs, long minInputBytes, long maxInputBytes, long minOutputBytes, long maxOutputBytes, long minDuration,
long maxDuration, long minStartTime, long maxStartTime) {
StringBuilder sb = new StringBuilder();
sb.append(WHERE);
if (searchTerm != null && searchTerm.length() > 0) {
sb.append(" (");
sb.append(like(WorkflowFields.WORKFLOWID, searchTerm));
sb.append(" or ");
sb.append(like(WorkflowFields.WORKFLOWNAME, searchTerm));
sb.append(" or ");
sb.append(like(WorkflowFields.USERNAME, searchTerm));
sb.append(")");
}
if (searchWorkflowId != null)
append(sb, like(WorkflowFields.WORKFLOWID, searchWorkflowId));
if (searchWorkflowName != null)
append(sb, like(WorkflowFields.WORKFLOWNAME, searchWorkflowName));
if (searchWorkflowType != null)
append(sb, startsWith(WorkflowFields.WORKFLOWID, searchWorkflowType));
if (searchUserName != null)
append(sb, equals(WorkflowFields.USERNAME, searchUserName));
addRangeSearch(sb, WorkflowFields.NUMJOBSTOTAL, minJobs, maxJobs);
addRangeSearch(sb, WorkflowFields.INPUTBYTES, minInputBytes, maxInputBytes);
addRangeSearch(sb, WorkflowFields.OUTPUTBYTES, minOutputBytes, maxOutputBytes);
addRangeSearch(sb, WorkflowFields.DURATION, minDuration, maxDuration);
addRangeSearch(sb, WorkflowFields.STARTTIME, minStartTime, maxStartTime);
if (sb.length() == WHERE.length())
return "";
else
return sb.toString();
}
@Override
public void close() {
if (db != null) {
try {
db.close();
} catch (SQLException e) {
LOG.error("Exception while closing connector", e);
}
db = null;
}
}
@Override
protected void finalize() throws Throwable {
close();
}
}