blob: 2310d9f6ad90962294dde1b1d6f4a50147973581 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.tool;
import com.google.common.base.Throwables;
import com.google.common.io.Files;
import com.google.common.util.concurrent.SimpleTimeLimiter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
* A Canary Tool to perform synthetic tests for Phoenix
* It assumes that TEST.PQSTEST or the schema.table passed in the argument
* is already present as following command
* CREATE TABLE IF NOT EXISTS TEST.PQSTEST (mykey INTEGER NOT NULL
* PRIMARY KEY, mycolumn VARCHAR, insert_date TIMESTAMP);
*
*/
public class PhoenixCanaryTool extends Configured implements Tool {
private static String TEST_SCHEMA_NAME = "TEST";
private static String TEST_TABLE_NAME = "PQSTEST";
private static String FQ_TABLE_NAME = "TEST.PQSTEST";
private static Timestamp timestamp;
private static final int MAX_CONNECTION_ATTEMPTS = 5;
private final int FIRST_TIME_RETRY_TIMEOUT = 5000;
private Sink sink = new StdOutSink();
public static final String propFileName = "phoenix-canary-file-sink.properties";
/**
* Base class for a Canary Test
*/
private abstract static class CanaryTest {
CanaryTestResult result = new CanaryTestResult();
Connection connection = null;
private void onCreate(Connection connection) {
result.setTimestamp(getCurrentTimestamp());
result.setStartTime(EnvironmentEdgeManager.currentTimeMillis());
this.connection = connection;
}
abstract void onExecute() throws Exception;
private void onExit() {
result.setExecutionTime(EnvironmentEdgeManager.currentTimeMillis() -
result.getStartTime());
}
CanaryTestResult runTest(Connection connection) {
try {
onCreate(connection);
onExecute();
result.setSuccessful(true);
result.setMessage("Test " + result.getTestName() + " successful");
} catch (Exception e) {
result.setSuccessful(false);
result.setMessage(Throwables.getStackTraceAsString(e));
} finally {
onExit();
}
return result;
}
}
static class UpsertTableTest extends CanaryTest {
void onExecute() throws Exception {
result.setTestName("upsertTable");
// Insert data
timestamp = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
String stmt = "UPSERT INTO " + FQ_TABLE_NAME
+ "(mykey, mycolumn, insert_date) VALUES (?, ?, ?)";
PreparedStatement ps = connection.prepareStatement(stmt);
ps.setInt(1, 1);
ps.setString(2, "Hello World");
ps.setTimestamp(3, timestamp);
ps.executeUpdate();
connection.commit();
}
}
static class ReadTableTest extends CanaryTest {
void onExecute() throws Exception {
result.setTestName("readTable");
PreparedStatement ps = connection.prepareStatement("SELECT * FROM "
+ FQ_TABLE_NAME+" WHERE INSERT_DATE = ?");
ps.setTimestamp(1,timestamp);
ResultSet rs = ps.executeQuery();
int totalRows = 0;
while (rs.next()) {
totalRows += 1;
Integer myKey = rs.getInt(1);
String myColumn = rs.getString(2);
if (myKey != 1 || !myColumn.equals("Hello World")) {
throw new Exception("Retrieved values do not " +
"match the inserted values");
}
}
if (totalRows != 1) {
throw new Exception(totalRows + " rows fetched instead of just one.");
}
ps.close();
rs.close();
}
}
/**
* Sink interface used by the canary to output information
*/
public interface Sink {
List<CanaryTestResult> getResults();
void updateResults(CanaryTestResult result);
void publishResults() throws Exception;
void clearResults();
}
public static class StdOutSink implements Sink {
private List<CanaryTestResult> results = new ArrayList<>();
@Override
public void updateResults(CanaryTestResult result) {
results.add(result);
}
@Override
public List<CanaryTestResult> getResults() {
return results;
}
@Override
public void publishResults() {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
String resultJson = gson.toJson(results);
System.out.println(resultJson);
}
@Override
public void clearResults() {
results.clear();
}
}
/**
* Implementation of File Out Sink
*/
public static class FileOutSink implements Sink {
private List<CanaryTestResult> results = new ArrayList<>();
File dir;
String logfileName;
public FileOutSink() throws Exception {
Properties prop = new Properties();
InputStream input = ClassLoader.getSystemResourceAsStream(propFileName);
if (input == null) {
throw new Exception("Cannot load " + propFileName + " file for " + "FileOutSink.");
}
prop.load(input);
logfileName = prop.getProperty("file.name");
dir = new File(prop.getProperty("file.location"));
dir.mkdirs();
}
@Override
public void updateResults(CanaryTestResult result) {
results.add(result);
}
@Override
public List<CanaryTestResult> getResults() {
return results;
}
@Override
public void publishResults() throws Exception {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
String resultJson = gson.toJson(results);
String fileName = logfileName + "-" + new SimpleDateFormat("yyyy.MM.dd.HH" + ".mm" +
".ss").format(new Date()) + ".log";
File file = new File(dir, fileName);
Files.write(Bytes.toBytes(resultJson), file);
}
@Override
public void clearResults() {
results.clear();
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixCanaryTool.class);
private static String getCurrentTimestamp() {
return new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss.ms").format(new Date());
}
private static Namespace parseArgs(String[] args) {
ArgumentParser parser = ArgumentParsers.newFor("Phoenix Canary Test Tool").build()
.description("Phoenix Canary Test Tool");
parser.addArgument("--hostname", "-hn").type(String.class).nargs("?").help("Hostname on "
+ "which Phoenix is running.");
parser.addArgument("--port", "-p").type(String.class).nargs("?").help("Port on " +
"which Phoenix is running.");
parser.addArgument("--constring", "-cs").type(String.class).nargs("?").help("Pass an " +
"explicit connection String to connect to Phoenix. " +
"default: jdbc:phoenix:thin:serialization=PROTOBUF;url=[hostName:port]");
parser.addArgument("--timeout", "-t").type(String.class).nargs("?").setDefault("60").help
("Maximum time for which the app should run before returning error. default:" + "" +
" 60 sec");
parser.addArgument("--testschema", "-ts").type(String.class).nargs("?").setDefault
(TEST_SCHEMA_NAME).help("Custom name for the test table. " + "default: " +
TEST_SCHEMA_NAME);
parser.addArgument("--testtable", "-tt").type(String.class).nargs("?").setDefault
(TEST_TABLE_NAME).help("Custom name for the test table." + " default: " +
TEST_TABLE_NAME);
parser.addArgument("--logsinkclass", "-lsc").type(String.class).nargs("?").setDefault
("org.apache.phoenix.tool.PhoenixCanaryTool$StdOutSink").help
("Path to a Custom implementation for log sink class. default: stdout");
Namespace res = null;
try {
res = parser.parseKnownArgs(args, null);
} catch (ArgumentParserException e) {
parser.handleError(e);
}
return res;
}
private CanaryTestResult appInfo = new CanaryTestResult();
private Connection connection = null;
@Override
public int run(String[] args) throws Exception {
try {
Namespace cArgs = parseArgs(args);
if (cArgs == null) {
LOGGER.error("Argument parsing failed.");
throw new RuntimeException("Argument parsing failed");
}
final String hostName = cArgs.getString("hostname");
final String port = cArgs.getString("port");
final String timeout = cArgs.getString("timeout");
final String conString = cArgs.getString("constring");
final String testSchemaName = cArgs.getString("testschema");
final String testTableName = cArgs.getString("testtable");
final String logSinkClass = cArgs.getString("logsinkclass");
TEST_TABLE_NAME = testTableName;
TEST_SCHEMA_NAME = testSchemaName;
FQ_TABLE_NAME = testSchemaName + "." + testTableName;
// Check if at least one from host+port or con string is provided.
if ((hostName == null || port == null) && conString == null) {
throw new RuntimeException("Provide at least one from host+port or constring");
}
int timeoutVal = Integer.parseInt(timeout);
// Dynamically load a class for sink
sink = (Sink) ClassLoader.getSystemClassLoader().loadClass(logSinkClass).newInstance();
long startTime = EnvironmentEdgeManager.currentTimeMillis();
String connectionURL = (conString != null) ? conString :
"jdbc:phoenix:thin:serialization=PROTOBUF;url=" + hostName + ":" + port;
appInfo.setTestName("appInfo");
appInfo.setMiscellaneous(connectionURL);
connection = getConnectionWithRetry(connectionURL);
if (connection == null) {
LOGGER.error("Failed to get connection after multiple retries; the connection is null");
}
SimpleTimeLimiter limiter = new SimpleTimeLimiter();
limiter.callWithTimeout(new Callable<Void>() {
public Void call() {
sink.clearResults();
// Execute tests
LOGGER.info("Starting UpsertTableTest");
sink.updateResults(new UpsertTableTest().runTest(connection));
LOGGER.info("Starting ReadTableTest");
sink.updateResults(new ReadTableTest().runTest(connection));
return null;
}
}, timeoutVal, TimeUnit.SECONDS, true);
long estimatedTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
appInfo.setExecutionTime(estimatedTime);
appInfo.setSuccessful(true);
} catch (Exception e) {
LOGGER.error(Throwables.getStackTraceAsString(e));
appInfo.setMessage(Throwables.getStackTraceAsString(e));
appInfo.setSuccessful(false);
} finally {
sink.updateResults(appInfo);
sink.publishResults();
connection.close();
}
return 0;
}
private Connection getConnectionWithRetry(String connectionURL) {
Connection connection=null;
try{
connection = getConnectionWithRetry(connectionURL, true);
} catch (Exception e) {
LOGGER.info("Failed to get connection with namespace enabled", e);
try {
connection = getConnectionWithRetry(connectionURL, false);
} catch (Exception ex) {
LOGGER.info("Failed to get connection without namespace enabled", ex);
}
}
return connection;
}
private Connection getConnectionWithRetry(String connectionURL, boolean namespaceFlag)
throws Exception {
Properties connProps = new Properties();
Connection connection = null;
connProps.setProperty("phoenix.schema.mapSystemTablesToNamespace", String.valueOf(namespaceFlag));
connProps.setProperty("phoenix.schema.isNamespaceMappingEnabled", String.valueOf(namespaceFlag));
RetryCounter retrier = new RetryCounter(MAX_CONNECTION_ATTEMPTS,
FIRST_TIME_RETRY_TIMEOUT, TimeUnit.MILLISECONDS);
LOGGER.info("Trying to get the connection with "
+ retrier.getMaxAttempts() + " attempts with "
+ "connectionURL :" + connectionURL
+ "connProps :" + connProps);
while (retrier.shouldRetry()) {
try {
connection = DriverManager.getConnection(connectionURL, connProps);
} catch (SQLException e) {
LOGGER.info("Trying to establish connection with "
+ retrier.getAttemptTimes() + " attempts", e);
}
if (connection != null) {
LOGGER.info("Successfully established connection within "
+ retrier.getAttemptTimes() + " attempts");
break;
}
retrier.sleepUntilNextRetry();
}
return connection;
}
public static void main(final String[] args) {
try {
LOGGER.info("Starting Phoenix Canary Test tool...");
ToolRunner.run(new PhoenixCanaryTool(), args);
} catch (Exception e) {
LOGGER.error("Error in running Phoenix Canary Test tool. " + e);
}
LOGGER.info("Exiting Phoenix Canary Test tool...");
}
}