blob: bf39757f22d50624bfdd478e691c1104f4311cbe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.database;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MetricsAggregation {
private static Log log = LogFactory.getLog(MetricsAggregation.class);
private static Connection conn = null;
private static Statement stmt = null;
private static ResultSet rs = null;
private static DatabaseConfig mdlConfig;
/**
* @param args is list of command line parameters
* @throws SQLException if SQL query fails
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
"SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE",
justification = "Dynamic based upon tables in the database")
public static void main(String[] args) throws SQLException {
mdlConfig = new DatabaseConfig();
// Connect to the database
String jdbc_url = System.getenv("JDBC_URL_PREFIX")
+ mdlConfig.get("jdbc.host") + "/" + mdlConfig.get("jdbc.db");
if (mdlConfig.get("jdbc.user") != null) {
jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
if (mdlConfig.get("jdbc.password") != null) {
jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
}
}
try {
// The newInstance() call is a work around for some
// broken Java implementations
org.apache.hadoop.chukwa.util.DriverManagerUtil.loadDriver().newInstance();
log.info("Initialized JDBC URL: " + jdbc_url);
} catch (Exception ex) {
// handle the error
ex.printStackTrace();
log.error(ex, ex);
}
try {
conn = org.apache.hadoop.chukwa.util.DriverManagerUtil.getConnection(jdbc_url);
} catch (SQLException ex) {
ex.printStackTrace();
log.error(ex, ex);
}
// get the latest timestamp for aggregation on this table
// Start = latest
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis() - (1000 * 60 * 60 * 24);
long end = System.currentTimeMillis() - (1000 * 60 * 10);
// retrieve metadata for cluster_system_metrics
DatabaseConfig dbConf = new DatabaseConfig();
String[] tables = dbConf.findTableName("cluster_system_metrics_2018_week",
start, end);
for (String table : tables) {
System.out.println("Table to aggregate per Ts: " + table);
stmt = conn.createStatement();
rs = stmt
.executeQuery("select table_ts from aggregation_admin_table where table_name=\""
+ table + "\"");
if (rs.next()) {
start = rs.getLong(1);
} else {
start = 0;
}
end = start + (1000 * 60 * 60 * 1); // do 1 hour aggregation max
long now = System.currentTimeMillis();
now = now - (1000 * 60 * 10); // wait for 10 minutes
end = Math.min(now, end);
// TODO REMOVE DEBUG ONLY!
end = now;
System.out.println("Start Date:" + new Date(start));
System.out.println("End Date:" + new Date(end));
DatabaseMetaData dbm = conn.getMetaData();
rs = dbm.getColumns(null, null, table, null);
List<String> cols = new ArrayList<String>();
while (rs.next()) {
String s = rs.getString(4); // 4 is column name, 5 data type etc.
System.out.println("Name: " + s);
int type = rs.getInt(5);
if (type == java.sql.Types.VARCHAR) {
System.out.println("Type: Varchar " + type);
} else {
cols.add(s);
System.out.println("Type: Number " + type);
}
}// end of while.
// build insert into from select query
String initTable = table.replace("cluster_", "");
StringBuilder sb0 = new StringBuilder();
StringBuilder sb = new StringBuilder();
sb0.append("insert into ").append(table).append(" (");
sb.append(" ( select ");
for (int i = 0; i < cols.size(); i++) {
sb0.append(cols.get(i));
sb.append("avg(").append(cols.get(i)).append(") ");
if (i < cols.size() - 1) {
sb0.append(",");
sb.append(",");
}
}
sb.append(" from ").append(initTable);
sb.append(" where timestamp between \"");
sb.append(formatter.format(start));
sb.append("\" and \"").append(formatter.format(end));
sb.append("\" group by timestamp )");
// close fields
sb0.append(" )").append(sb);
System.out.println(sb0.toString());
// run query
conn.setAutoCommit(false);
stmt = conn.createStatement();
final String query = sb0.toString();
stmt.execute(query);
// update last run
stmt = conn.createStatement();
stmt.execute("insert into aggregation_admin_table set table_ts=\""
+ formatter.format(end) + "\" where table_name=\"" + table + "\"");
conn.commit();
}
}
}