blob: 9fcc38e5153d90676c51e3de8b1e9d32e59c7a04 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.pherf.workload;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.Callable;
import org.apache.phoenix.pherf.result.DataModelResult;
import org.apache.phoenix.pherf.result.ResultManager;
import org.apache.phoenix.pherf.result.RunTime;
import org.apache.phoenix.pherf.result.ThreadTime;
import org.apache.phoenix.pherf.rules.RulesApplier;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.phoenix.pherf.PherfConstants.GeneratePhoenixStats;
import org.apache.phoenix.pherf.configuration.Query;
import org.apache.phoenix.pherf.configuration.Scenario;
import org.apache.phoenix.pherf.configuration.WriteParams;
import org.apache.phoenix.pherf.configuration.XMLConfigParser;
import org.apache.phoenix.pherf.util.PhoenixUtil;
class MultiThreadedRunner implements Callable<Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiThreadedRunner.class);
private Query query;
private ThreadTime threadTime;
private PhoenixUtil pUtil = PhoenixUtil.create();
private String threadName;
private DataModelResult dataModelResult;
private long numberOfExecutions;
private long executionDurationInMs;
private static long lastResultWritten = EnvironmentEdgeManager.currentTimeMillis() - 1000;
private final ResultManager resultManager;
private final RulesApplier ruleApplier;
private final Scenario scenario;
private final WorkloadExecutor workloadExecutor;
private final XMLConfigParser parser;
/**
* MultiThreadedRunner
*
* @param threadName
* @param query
* @param dataModelResult
* @param threadTime
* @param numberOfExecutions
* @param executionDurationInMs
* @param ruleRunner
*/
MultiThreadedRunner(String threadName, Query query, DataModelResult dataModelResult,
ThreadTime threadTime, long numberOfExecutions, long executionDurationInMs, boolean writeRuntimeResults, RulesApplier ruleApplier, Scenario scenario, WorkloadExecutor workloadExecutor, XMLConfigParser parser) {
this.query = query;
this.threadName = threadName;
this.threadTime = threadTime;
this.dataModelResult = dataModelResult;
this.numberOfExecutions = numberOfExecutions;
this.executionDurationInMs = executionDurationInMs;
this.ruleApplier = ruleApplier;
this.scenario = scenario;
this.resultManager = new ResultManager(dataModelResult.getName(), writeRuntimeResults);
this.workloadExecutor = workloadExecutor;
this.parser = parser;
}
/**
* Executes run for a minimum of number of execution or execution duration
*/
@Override
public Void call() throws Exception {
LOGGER.info("\n\nThread Starting " + threadName + " ; " + query.getStatement() + " for "
+ numberOfExecutions + "times\n\n");
Long start = EnvironmentEdgeManager.currentTimeMillis();
for (long i = numberOfExecutions; (i > 0 && ((EnvironmentEdgeManager.currentTimeMillis() - start)
< executionDurationInMs)); i--) {
synchronized (workloadExecutor) {
timedQuery();
if ((EnvironmentEdgeManager.currentTimeMillis() - lastResultWritten) > 1000) {
resultManager.write(dataModelResult, ruleApplier);
lastResultWritten = EnvironmentEdgeManager.currentTimeMillis();
}
}
}
// Make sure all result have been dumped before exiting
synchronized (workloadExecutor) {
resultManager.flush();
}
LOGGER.info("\n\nThread exiting." + threadName + "\n\n");
return null;
}
private synchronized ThreadTime getThreadTime() {
return threadTime;
}
/**
* Timed query execution
*
* @throws Exception
*/
private void timedQuery() throws Exception {
boolean
isSelectCountStatement =
query.getStatement().toUpperCase().trim().contains("COUNT(") ? true : false;
Connection conn = null;
PreparedStatement statement = null;
ResultSet rs = null;
Long start = EnvironmentEdgeManager.currentTimeMillis();
Date startDate = Calendar.getInstance().getTime();
String exception = null;
long resultRowCount = 0;
try {
conn = pUtil.getConnection(query.getTenantId(), scenario.getPhoenixProperties());
conn.setAutoCommit(true);
final String statementString = query.getDynamicStatement(ruleApplier, scenario);
statement = conn.prepareStatement(statementString);
LOGGER.info("Executing: " + statementString);
if (scenario.getWriteParams() != null) {
Workload writes = new WriteWorkload(PhoenixUtil.create(), parser, scenario, GeneratePhoenixStats.NO);
workloadExecutor.add(writes);
}
boolean isQuery = statement.execute();
if (isQuery) {
rs = statement.getResultSet();
while (rs.next()) {
if (null != query.getExpectedAggregateRowCount()) {
if (rs.getLong(1) != query.getExpectedAggregateRowCount())
throw new RuntimeException(
"Aggregate count " + rs.getLong(1) + " does not match expected "
+ query.getExpectedAggregateRowCount());
}
if (isSelectCountStatement) {
resultRowCount = rs.getLong(1);
} else {
resultRowCount++;
}
}
} else {
conn.commit();
}
} catch (Exception e) {
LOGGER.error("Exception while executing query", e);
exception = e.getMessage();
throw e;
} finally {
getThreadTime().getRunTimesInMs().add(new RunTime(exception, startDate, resultRowCount,
(int) (EnvironmentEdgeManager.currentTimeMillis() - start)));
if (rs != null) rs.close();
if (statement != null) statement.close();
if (conn != null) conn.close();
}
}
}