blob: 23d5eaaf96f097d1301808a1a5e3913709149ca9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.vxquery.cli;
import static org.apache.vxquery.rest.Constants.HttpHeaderValues.CONTENT_TYPE_JSON;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.logging.LogManager;
import javax.xml.bind.JAXBException;
import org.apache.commons.io.FileUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.HttpClientUtils;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.vxquery.app.util.LocalClusterUtil;
import org.apache.vxquery.app.util.RestUtils;
import org.apache.vxquery.rest.request.QueryRequest;
import org.apache.vxquery.rest.response.AsyncQueryResponse;
import org.apache.vxquery.rest.response.Error;
import org.apache.vxquery.rest.response.ErrorResponse;
import org.apache.vxquery.rest.response.Metrics;
import org.apache.vxquery.rest.response.SyncQueryResponse;
import org.apache.vxquery.rest.service.VXQueryConfig;
import org.kohsuke.args4j.Argument;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
/**
* CLI for VXQuery. This class is using the REST API to execute statements given by the user.
*
* @author Erandi Ganepola
*/
public class VXQuery {
private final CmdLineOptions opts;
private static LocalClusterUtil localClusterUtil;
private String restIpAddress;
private int restPort;
private static List<Metrics> metricsList = new ArrayList<>();
private int executionIteration;
/**
* Constructor to use command line options passed.
*
* @param opts
* Command line options object
*/
public VXQuery(CmdLineOptions opts) {
this.opts = opts;
}
/**
* Main method to get command line options and execute query process.
*
* @param args
* command line arguments
*/
public static void main(String[] args) {
LogManager.getLogManager().reset();
final CmdLineOptions opts = new CmdLineOptions();
CmdLineParser parser = new CmdLineParser(opts);
// parse command line options, give error message if no arguments passed
try {
parser.parseArgument(args);
} catch (Exception e) {
parser.printUsage(System.err);
return;
}
if (opts.xqFiles.isEmpty()) {
parser.printUsage(System.err);
return;
}
VXQuery vxq = new VXQuery(opts);
vxq.execute(opts.xqFiles);
}
private void execute(List<String> xqFiles) {
if (opts.restIpAddress == null) {
System.out.println("No REST Ip address given. Creating a local hyracks cluster");
VXQueryConfig vxqConfig = new VXQueryConfig();
vxqConfig.setAvailableProcessors(opts.availableProcessors);
vxqConfig.setFrameSize(opts.frameSize);
vxqConfig.setHdfsConf(opts.hdfsConf);
vxqConfig.setJoinHashSize(opts.joinHashSize);
vxqConfig.setMaximumDataSize(opts.maximumDataSize);
localClusterUtil = new LocalClusterUtil();
try {
localClusterUtil.init(vxqConfig);
restIpAddress = localClusterUtil.getIpAddress();
restPort = localClusterUtil.getRestPort();
} catch (Exception e) {
System.err.println("Unable to start local hyracks cluster due to: " + e.getMessage());
e.printStackTrace();
return;
}
} else {
restIpAddress = opts.restIpAddress;
restPort = opts.restPort;
}
System.out.println("Running queries given in: " + Arrays.toString(xqFiles.toArray()));
runQueries(xqFiles);
if (localClusterUtil != null) {
try {
localClusterUtil.deinit();
} catch (Exception e) {
System.err.println("Error occurred when stopping local hyracks: " + e.getMessage());
}
}
}
public void runQueries(List<String> xqFiles) {
for (String xqFile : xqFiles) {
String query;
try {
query = slurp(xqFile);
} catch (IOException e) {
System.err.println(String.format("Error occurred when reading XQuery file %s with message: %s", xqFile,
e.getMessage()));
continue;
}
System.out.println();
System.out.println("====================================================");
System.out.println("\tQuery - '" + xqFile + "'");
System.out.println("====================================================");
QueryRequest request = createQueryRequest(opts, query);
metricsList.clear();
for (int i = 0; i < opts.repeatExec; i++) {
System.out.println("**** Repetition : " + (i + 1) + " ****");
executionIteration = i;
sendQueryRequest(xqFile, request, this);
}
if (opts.repeatExec > 1) {
showTimingSummary();
}
}
}
private void onSuccess(String xqFile, QueryRequest request, SyncQueryResponse response) {
if (response == null) {
System.err.println(String.format("Unable to execute query %s", request.getStatement()));
return;
}
if (opts.showQuery) {
printField("Query", response.getStatement());
}
if (request.isShowMetrics()) {
String metrics = String.format("Compile Time:\t%d\nElapsed Time:\t%d",
response.getMetrics().getCompileTime(), response.getMetrics().getElapsedTime());
printField("Metrics", metrics);
}
if (request.isShowAbstractSyntaxTree()) {
printField("Abstract Syntax Tree", response.getAbstractSyntaxTree());
}
if (request.isShowTranslatedExpressionTree()) {
printField("Translated Expression Tree", response.getTranslatedExpressionTree());
}
if (request.isShowOptimizedExpressionTree()) {
printField("Optimized Expression Tree", response.getOptimizedExpressionTree());
}
if (request.isShowRuntimePlan()) {
printField("Runtime Plan", response.getRuntimePlan());
}
printField("Results", response.getResults());
if (executionIteration >= opts.timingIgnoreQueries) {
metricsList.add(response.getMetrics());
}
}
private void onFailure(String xqFile, ErrorResponse response) {
if (response == null) {
System.err.println(String.format("Unable to execute query in %s", xqFile));
return;
}
System.err.println();
System.err.println("------------------------ Errors ---------------------");
Error error = response.getError();
String errorMsg = String.format("Code:\t %d\nMessage:\t %s", error.getCode(), error.getMessage());
printField(System.err, String.format("Errors for '%s'", xqFile), errorMsg);
}
/**
* Submits a query to be executed by the REST API. Will call {@link #onFailure(String, ErrorResponse)} if any error
* occurs when submitting the query. Else will call {@link #onSuccess(String, QueryRequest, SyncQueryResponse)} with
* the {@link AsyncQueryResponse}
*
* @param xqFile
* .xq file with the query to be executed
* @param request
* {@link QueryRequest} instance to be submitted to REST API
* @param cli
* cli class instance
*/
private static void sendQueryRequest(String xqFile, QueryRequest request, VXQuery cli) {
URI uri = null;
try {
uri = RestUtils.buildQueryURI(request, cli.restIpAddress, cli.restPort);
} catch (URISyntaxException e) {
System.err.println(
String.format("Unable to build URI to call REST API for query: %s", request.getStatement()));
cli.onFailure(xqFile, null);
}
CloseableHttpClient httpClient = HttpClients.custom().build();
try {
HttpGet httpGet = new HttpGet(uri);
httpGet.setHeader(HttpHeaders.ACCEPT, CONTENT_TYPE_JSON);
try (CloseableHttpResponse httpResponse = httpClient.execute(httpGet)) {
HttpEntity entity = httpResponse.getEntity();
String response = RestUtils.readEntity(entity);
if (httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
cli.onSuccess(xqFile, request,
RestUtils.mapEntity(response, SyncQueryResponse.class, CONTENT_TYPE_JSON));
} else {
cli.onFailure(xqFile, RestUtils.mapEntity(response, ErrorResponse.class, CONTENT_TYPE_JSON));
}
} catch (IOException e) {
System.err.println("Error occurred when reading entity: " + e.getMessage());
cli.onFailure(xqFile, null);
} catch (JAXBException e) {
System.err.println("Error occurred when mapping query response: " + e.getMessage());
cli.onFailure(xqFile, null);
}
} finally {
HttpClientUtils.closeQuietly(httpClient);
}
}
/**
* Once the query in a given .xq file has been executed (with repeated executions as well), this method calculates
* mean, standard deviation, minimum and maximum execution times.
*/
private void showTimingSummary() {
double sumTime = 0;
double sumSquaredTime = 0;
long minTime = Long.MAX_VALUE;
long maxTime = Long.MIN_VALUE;
for (int i = 0; i < metricsList.size(); i++) {
Metrics metrics = metricsList.get(i);
long totalTime = metrics.getCompileTime() + metrics.getElapsedTime();
sumTime += totalTime;
sumSquaredTime += totalTime * totalTime;
if (totalTime < minTime) {
minTime = totalTime;
}
if (totalTime > maxTime) {
maxTime = totalTime;
}
}
double mean = sumTime / (opts.repeatExec - opts.timingIgnoreQueries);
double sd = Math.sqrt(sumSquaredTime / (opts.repeatExec - opts.timingIgnoreQueries) - mean * mean);
System.out.println();
System.out.println("\t**** Timing Summary ****");
System.out.println("----------------------------------------------------");
System.out.println(String.format("Repetitions:\t%d, Timing Ignored Iterations:\t%d", opts.repeatExec,
opts.timingIgnoreQueries));
System.out.println("Average execution time:\t" + mean + " ms");
System.out.println("Standard deviation:\t" + String.format("%.4f", sd));
System.out.println("Coefficient of variation:\t" + String.format("%.4f", sd / mean));
System.out.println("Minimum execution time:\t" + minTime + " ms");
System.out.println("Maximum execution time:\t" + maxTime + " ms");
System.out.println();
}
private static QueryRequest createQueryRequest(CmdLineOptions opts, String query) {
QueryRequest request = new QueryRequest(query);
request.setCompileOnly(opts.compileOnly);
request.setOptimization(opts.optimizationLevel);
request.setFrameSize(opts.frameSize);
request.setRepeatExecutions(opts.repeatExec);
request.setShowMetrics(opts.timing);
request.setShowAbstractSyntaxTree(opts.showAST);
request.setShowTranslatedExpressionTree(opts.showTET);
request.setShowOptimizedExpressionTree(opts.showOET);
request.setShowRuntimePlan(opts.showRP);
request.setAsync(false);
return request;
}
/**
* Reads the contents of file given in query into a String. The file is always closed. For XML files UTF-8 encoding
* is used.
*
* @param query
* The query with filename to be processed
* @return UTF-8 formatted query string
* @throws IOException
*/
private static String slurp(String query) throws IOException {
return FileUtils.readFileToString(new File(query), "UTF-8");
}
private static void printField(PrintStream out, String field, String value) {
out.println();
field = field + ":";
out.print(field);
String[] lines = value.split("\n");
for (int i = 0; i < lines.length; i++) {
int margin = 4;
if (i != 0) {
margin += field.length();
}
System.out.print(String.format("%1$" + margin + "s%2$s\n", "", lines[i]));
}
}
private static void printField(String field, String value) {
printField(System.out, field, value);
}
/**
* Helper class with fields and methods to handle all command line options
*/
private static class CmdLineOptions {
@Option(name = "-rest-ip-address", usage = "IP Address of the REST Server")
private String restIpAddress = null;
@Option(name = "-rest-port", usage = "Port of REST Server")
private int restPort = 8085;
@Option(name = "-compileonly", usage = "Compile the query and stop.")
private boolean compileOnly;
@Option(name = "-O", usage = "Optimization Level. (default: Full Optimization)")
private int optimizationLevel = Integer.MAX_VALUE;
@Option(name = "-frame-size", usage = "Frame size in bytes. (default: 65,536)")
private int frameSize = 65536;
@Option(name = "-repeatexec", usage = "Number of times to repeat execution.")
private int repeatExec = 1;
@Option(name = "-timing", usage = "Produce timing information.")
private boolean timing;
@Option(name = "-showquery", usage = "Show query string.")
private boolean showQuery;
@Option(name = "-showast", usage = "Show abstract syntax tree.")
private boolean showAST;
@Option(name = "-showtet", usage = "Show translated expression tree.")
private boolean showTET;
@Option(name = "-showoet", usage = "Show optimized expression tree.")
private boolean showOET;
@Option(name = "-showrp", usage = "Show Runtime plan.")
private boolean showRP;
// Optional (Not supported by REST API) parameters. Only used for creating a
// local hyracks cluster
@Option(name = "-join-hash-size", usage = "Join hash size in bytes. (default: 67,108,864)")
private long joinHashSize = -1;
@Option(name = "-maximum-data-size", usage = "Maximum possible data size in bytes. (default: 150,323,855,000)")
private long maximumDataSize = -1;
@Option(name = "-buffer-size", usage = "Disk read buffer size in bytes.")
private int bufferSize = -1;
@Option(name = "-result-file", usage = "File path to save the query result.")
private String resultFile = null;
@Option(name = "-timing-ignore-queries", usage = "Ignore the first X number of quereies.")
private int timingIgnoreQueries = 0;
@Option(name = "-hdfs-conf", usage = "Directory path to Hadoop configuration files")
private String hdfsConf = null;
@Option(name = "-available-processors", usage = "Number of available processors. (default: java's available processors)")
private int availableProcessors = -1;
@Option(name = "-local-node-controllers", usage = "Number of local node controllers. (default: 1)")
private int localNodeControllers = 1;
@Argument
private List<String> xqFiles = new ArrayList<>();
}
}