blob: 8ab0d9a81311c95b2906e6ee04d9ca8475524669 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.tools;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.falcon.cliParser.CLIParser;
import org.apache.falcon.service.FalconJPAService;
import org.apache.falcon.util.BuildProperties;
import org.apache.falcon.util.StateStoreProperties;
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Command Line utility for Table Creation, Update.
*/
public class FalconStateStoreDBCLI {
public static final String HELP_CMD = "help";
public static final String VERSION_CMD = "version";
public static final String CREATE_CMD = "create";
public static final String SQL_FILE_OPT = "sqlfile";
public static final String RUN_OPT = "run";
public static final String UPGRADE_CMD = "upgrade";
// Represents whether DB instance exists or not.
private boolean instanceExists;
private static final String[] FALCON_HELP =
{"Falcon DB initialization tool currently supports Derby DB/ Mysql/ PostgreSQL"};
public static void main(String[] args) {
new FalconStateStoreDBCLI().run(args);
}
public FalconStateStoreDBCLI() {
instanceExists = false;
}
protected Options getOptions() {
Option sqlfile = new Option(SQL_FILE_OPT, true,
"Generate SQL script instead of creating/upgrading the DB schema");
Option run = new Option(RUN_OPT, false, "Confirmation option regarding DB schema creation/upgrade");
Options options = new Options();
options.addOption(sqlfile);
options.addOption(run);
return options;
}
public synchronized int run(String[] args) {
if (instanceExists) {
throw new IllegalStateException("CLI instance already used");
}
instanceExists = true;
CLIParser parser = new CLIParser("falcondb", FALCON_HELP);
parser.addCommand(HELP_CMD, "", "Display usage for all commands or specified command", new Options(), false);
parser.addCommand(VERSION_CMD, "", "Show Falcon DB version information", new Options(), false);
parser.addCommand(CREATE_CMD, "", "Create Falcon DB schema", getOptions(), false);
parser.addCommand(UPGRADE_CMD, "", "Upgrade Falcon DB schema", getOptions(), false);
try {
CLIParser.Command command = parser.parse(args);
if (command.getName().equals(HELP_CMD)) {
parser.showHelp();
} else if (command.getName().equals(VERSION_CMD)) {
showVersion();
} else {
if (!command.getCommandLine().hasOption(SQL_FILE_OPT)
&& !command.getCommandLine().hasOption(RUN_OPT)) {
throw new Exception("'-sqlfile <FILE>' or '-run' options must be specified");
}
CommandLine commandLine = command.getCommandLine();
String sqlFile = (commandLine.hasOption(SQL_FILE_OPT))
? commandLine.getOptionValue(SQL_FILE_OPT)
: File.createTempFile("falcondb-", ".sql").getAbsolutePath();
boolean run = commandLine.hasOption(RUN_OPT);
if (command.getName().equals(CREATE_CMD)) {
createDB(sqlFile, run);
} else if (command.getName().equals(UPGRADE_CMD)) {
upgradeDB(sqlFile, run);
}
System.out.println("The SQL commands have been written to: " + sqlFile);
if (!run) {
System.out.println("WARN: The SQL commands have NOT been executed, you must use the '-run' option");
}
}
return 0;
} catch (ParseException ex) {
System.err.println("Invalid sub-command: " + ex.getMessage());
System.err.println();
System.err.println(parser.shortHelp());
return 1;
} catch (Exception ex) {
System.err.println();
System.err.println("Error: " + ex.getMessage());
System.err.println();
System.err.println("Stack trace for the error was (for debug purposes):");
System.err.println("--------------------------------------");
ex.printStackTrace(System.err);
System.err.println("--------------------------------------");
System.err.println();
return 1;
}
}
private void upgradeDB(String sqlFile, boolean run) throws Exception {
validateConnection();
if (!checkDBExists()) {
throw new Exception("Falcon DB doesn't exist");
}
String falconVersion = BuildProperties.get().getProperty("project.version");
String dbVersion = getFalconDBVersion();
if (dbVersion.compareTo(falconVersion) >= 0) {
System.out.println("Falcon DB already upgraded to Falcon version '" + falconVersion + "'");
return;
}
createUpgradeDB(sqlFile, run, false);
upgradeFalconDBVersion(sqlFile, run, falconVersion);
// any post upgrade tasks
if (run) {
System.out.println("Falcon DB has been upgraded to Falcon version '" + falconVersion + "'");
}
}
private void upgradeFalconDBVersion(String sqlFile, boolean run, String version) throws Exception {
String updateDBVersion = "update FALCON_DB_PROPS set data='" + version + "' where name='db.version'";
PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
writer.println();
writer.println(updateDBVersion);
writer.close();
System.out.println("Upgrade db.version in FALCON_DB_PROPS table to " + version);
if (run) {
Connection conn = createConnection();
Statement st = null;
try {
conn.setAutoCommit(true);
st = conn.createStatement();
st.executeUpdate(updateDBVersion);
st.close();
} catch (Exception ex) {
throw new Exception("Could not upgrade db.version in FALCON_DB_PROPS table: " + ex.toString(), ex);
} finally {
closeStatement(st);
conn.close();
}
}
System.out.println("DONE");
}
private static final String GET_FALCON_DB_VERSION = "select data from FALCON_DB_PROPS where name = 'db.version'";
private String getFalconDBVersion() throws Exception {
String version;
System.out.println("Get Falcon DB version");
Connection conn = createConnection();
Statement st = null;
ResultSet rs = null;
try {
st = conn.createStatement();
rs = st.executeQuery(GET_FALCON_DB_VERSION);
if (rs.next()) {
version = rs.getString(1);
} else {
throw new Exception("ERROR: Could not find Falcon DB 'db.version' in FALCON_DB_PROPS table");
}
} catch (Exception ex) {
throw new Exception("ERROR: Could not query FALCON_DB_PROPS table: " + ex.toString(), ex);
} finally {
closeResultSet(rs);
closeStatement(st);
conn.close();
}
System.out.println("DONE");
return version;
}
private Map<String, String> getJdbcConf() throws Exception {
Map<String, String> jdbcConf = new HashMap<String, String>();
jdbcConf.put("driver", StateStoreProperties.get().getProperty(FalconJPAService.DRIVER));
String url = StateStoreProperties.get().getProperty(FalconJPAService.URL);
jdbcConf.put("url", url);
jdbcConf.put("user", StateStoreProperties.get().getProperty(FalconJPAService.USERNAME));
jdbcConf.put("password", StateStoreProperties.get().getProperty(FalconJPAService.PASSWORD));
String dbType = url.substring("jdbc:".length());
if (dbType.indexOf(":") <= 0) {
throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
}
dbType = dbType.substring(0, dbType.indexOf(":"));
jdbcConf.put("dbtype", dbType);
return jdbcConf;
}
private String[] createMappingToolArguments(String sqlFile, boolean create) throws Exception {
Map<String, String> conf = getJdbcConf();
List<String> args = new ArrayList<String>();
args.add("-schemaAction");
if (create) {
args.add("add");
} else {
args.add("refresh");
}
args.add("-p");
args.add("persistence.xml#falcon-" + conf.get("dbtype"));
args.add("-connectionDriverName");
args.add(conf.get("driver"));
args.add("-connectionURL");
args.add(conf.get("url"));
args.add("-connectionUserName");
args.add(conf.get("user"));
args.add("-connectionPassword");
args.add(conf.get("password"));
if (sqlFile != null) {
args.add("-sqlFile");
args.add(sqlFile);
}
args.add("-indexes");
args.add("true");
args.add("org.apache.falcon.persistence.EntityBean");
args.add("org.apache.falcon.persistence.InstanceBean");
args.add("org.apache.falcon.persistence.PendingInstanceBean");
args.add("org.apache.falcon.persistence.MonitoredEntityBean");
args.add("org.apache.falcon.persistence.EntitySLAAlertBean");
args.add("org.apache.falcon.persistence.BacklogMetricBean");
args.add("org.apache.falcon.persistence.ExtensionBean");
args.add("org.apache.falcon.persistence.ExtensionJobsBean");
args.add("org.apache.falcon.persistence.ProcessInstanceInfoBean");
return args.toArray(new String[args.size()]);
}
private void createDB(String sqlFile, boolean run) throws Exception {
validateConnection();
if (checkDBExists()) {
return;
}
verifyFalconPropsTable(false);
createUpgradeDB(sqlFile, run, true);
createFalconPropsTable(sqlFile, run, BuildProperties.get().getProperty("project.version"));
if (run) {
System.out.println("Falcon DB has been created for Falcon version '"
+ BuildProperties.get().getProperty("project.version") + "'");
}
}
private static final String CREATE_FALCON_DB_PROPS =
"create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";
private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
String insertDbVersion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
writer.println();
writer.println(CREATE_FALCON_DB_PROPS);
writer.println(insertDbVersion);
writer.close();
System.out.println("Create FALCON_DB_PROPS table");
if (run) {
Connection conn = createConnection();
Statement st = null;
try {
conn.setAutoCommit(true);
st = conn.createStatement();
st.executeUpdate(CREATE_FALCON_DB_PROPS);
st.executeUpdate(insertDbVersion);
st.close();
} catch (Exception ex) {
closeStatement(st);
throw new Exception("Could not create FALCON_DB_PROPS table: " + ex.toString(), ex);
} finally {
conn.close();
}
}
System.out.println("DONE");
}
private static final String FALCON_DB_PROPS_EXISTS = "select count(*) from FALCON_DB_PROPS";
private boolean verifyFalconPropsTable(boolean exists) throws Exception {
System.out.println((exists) ? "Check FALCON_DB_PROPS table exists"
: "Checking FALCON_DB_PROPS table does not exist");
boolean tableExists;
Connection conn = createConnection();
Statement st = null;
ResultSet rs = null;
try {
st = conn.createStatement();
rs = st.executeQuery(FALCON_DB_PROPS_EXISTS);
rs.next();
tableExists = true;
} catch (Exception ex) {
tableExists = false;
} finally {
closeResultSet(rs);
closeStatement(st);
conn.close();
}
if (tableExists != exists) {
throw new Exception("FALCON_DB_PROPS_TABLE table " + ((exists) ? "does not exist" : "exists"));
}
System.out.println("DONE");
return tableExists;
}
private void closeResultSet(ResultSet rs) {
try {
if (rs != null) {
rs.close();
}
} catch (Exception e) {
System.out.println("Unable to close ResultSet " + rs);
}
}
private void closeStatement(Statement st) throws Exception {
try {
if (st != null) {
st.close();
}
} catch (Exception e) {
System.out.println("Unable to close SQL Statement " + st);
throw new Exception(e);
}
}
private Connection createConnection() throws Exception {
Map<String, String> conf = getJdbcConf();
Class.forName(conf.get("driver")).newInstance();
return DriverManager.getConnection(conf.get("url"), conf.get("user"), conf.get("password"));
}
private void validateConnection() throws Exception {
System.out.println("Validating DB Connection");
try {
createConnection().close();
System.out.println("DONE");
} catch (Exception ex) {
throw new Exception("Could not connect to the database: " + ex.toString(), ex);
}
}
private static final String ENTITY_STATUS_QUERY =
"select count(*) from ENTITIES where current_state IN ('RUNNING', 'SUSPENDED')";
private static final String INSTANCE_STATUS_QUERY =
"select count(*) from INSTANCES where current_state IN ('RUNNING', 'SUSPENDED')";
private boolean checkDBExists() throws Exception {
boolean schemaExists;
Connection conn = createConnection();
ResultSet rs = null;
Statement st = null;
try {
st = conn.createStatement();
rs = st.executeQuery(ENTITY_STATUS_QUERY);
rs.next();
schemaExists = true;
} catch (Exception ex) {
schemaExists = false;
} finally {
closeResultSet(rs);
closeStatement(st);
conn.close();
}
System.out.println("DB schema " + ((schemaExists) ? "exists" : "does not exist"));
return schemaExists;
}
private void createUpgradeDB(String sqlFile, boolean run, boolean create) throws Exception {
System.out.println((create) ? "Create SQL schema" : "Upgrade SQL schema");
String[] args = createMappingToolArguments(sqlFile, create);
org.apache.openjpa.jdbc.meta.MappingTool.main(args);
if (run) {
args = createMappingToolArguments(null, create);
org.apache.openjpa.jdbc.meta.MappingTool.main(args);
}
System.out.println("DONE");
}
private void showVersion() throws Exception {
System.out.println("Falcon Server version: "
+ BuildProperties.get().getProperty("project.version"));
validateConnection();
if (!checkDBExists()) {
throw new Exception("Falcon DB doesn't exist");
}
try {
verifyFalconPropsTable(true);
} catch (Exception ex) {
throw new Exception("ERROR: It seems this Falcon DB was never upgraded with the 'falcondb' tool");
}
showFalconPropsInfo();
}
private static final String GET_FALCON_PROPS_INFO = "select name, data from FALCON_DB_PROPS order by name";
private void showFalconPropsInfo() throws Exception {
Connection conn = createConnection();
Statement st = null;
ResultSet rs = null;
try {
System.out.println("Falcon DB Version Information");
System.out.println("--------------------------------------");
st = conn.createStatement();
rs = st.executeQuery(GET_FALCON_PROPS_INFO);
while (rs.next()) {
System.out.println(rs.getString(1) + ": " + rs.getString(2));
}
System.out.println("--------------------------------------");
} catch (Exception ex) {
throw new Exception("ERROR querying FALCON_DB_PROPS table: " + ex.toString(), ex);
} finally {
closeResultSet(rs);
closeStatement(st);
conn.close();
}
}
}