blob: 6a3117595dbcdd781441f6e0b29b47667c243063 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.cq;
import org.apache.iotdb.commons.concurrent.WrappedRunnable;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
import org.apache.iotdb.db.exception.ContinuousQueryException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.qp.strategy.LogicalGenerator;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.basic.ServiceProvider;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class ContinuousQueryTask extends WrappedRunnable {
protected static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryTask.class);
protected static final Pattern PATH_NODE_NAME_PATTERN = Pattern.compile("\\$\\{\\w+}");
protected static final int EXECUTION_BATCH_SIZE = IoTDBConstant.DEFAULT_FETCH_SIZE;
protected final ServiceProvider serviceProvider;
// To save the continuous query info
protected final CreateContinuousQueryPlan continuousQueryPlan;
// Next timestamp to execute a query
protected final long windowEndTimestamp;
public ContinuousQueryTask(
CreateContinuousQueryPlan continuousQueryPlan, long windowEndTimestamp) {
this.continuousQueryPlan = continuousQueryPlan;
this.windowEndTimestamp = windowEndTimestamp;
serviceProvider = IoTDB.serviceProvider;
}
@Override
public void runMayThrow()
throws QueryProcessException, StorageEngineException, IOException, InterruptedException,
QueryFilterOptimizationException, MetadataException, TException, SQLException {
// construct logical operator
final String sql = generateSQL();
Operator operator = LogicalGenerator.generate(sql, ZoneId.systemDefault());
if (!operator.isQuery()) {
throw new ContinuousQueryException(
String.format("unsupported operation in cq task: %s", operator.getType().name()));
}
QueryOperator queryOperator = (QueryOperator) operator;
// construct query plan
final GroupByTimePlan queryPlan =
(GroupByTimePlan) serviceProvider.getPlanner().operatorToPhysicalPlan(queryOperator);
if (queryPlan.getDeduplicatedPaths().isEmpty()) {
if (continuousQueryPlan.isDebug()) {
LOGGER.info(continuousQueryPlan.getContinuousQueryName() + ": deduplicated paths empty.");
}
return;
}
// construct query dataset
final long queryId = ServiceProvider.SESSION_MANAGER.requestQueryId(true);
try {
final QueryContext queryContext =
serviceProvider.genQueryContext(
queryId,
queryPlan.isDebug(),
System.currentTimeMillis(),
sql,
IoTDBConstant.DEFAULT_CONNECTION_TIMEOUT_MS);
final QueryDataSet queryDataSet =
serviceProvider.createQueryDataSet(queryContext, queryPlan, EXECUTION_BATCH_SIZE);
if (queryDataSet == null || queryDataSet.getPaths().size() == 0) {
if (continuousQueryPlan.isDebug()) {
LOGGER.info(continuousQueryPlan.getContinuousQueryName() + ": query result empty.");
}
return;
}
// insert data into target timeseries
doInsert(sql, queryOperator, queryPlan, queryDataSet);
} finally {
ServiceProvider.SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
}
}
protected String generateSQL() {
return continuousQueryPlan.getQuerySqlBeforeGroupByClause()
+ "group by (["
+ (windowEndTimestamp - continuousQueryPlan.getForInterval())
+ ','
+ windowEndTimestamp
+ "),"
+ continuousQueryPlan.getGroupByTimeIntervalString()
+ ") "
+ (continuousQueryPlan.getQuerySqlAfterGroupByClause().equals("") ? "" : ", ")
+ continuousQueryPlan.getQuerySqlAfterGroupByClause();
}
protected void doInsert(
String sql, QueryOperator queryOperator, GroupByTimePlan queryPlan, QueryDataSet queryDataSet)
throws MetadataException, QueryProcessException, StorageEngineException, IOException {
InsertTabletPlansIterator insertTabletPlansIterator =
new InsertTabletPlansIterator(
queryPlan,
queryDataSet,
queryOperator.getFromComponent().getPrefixPaths().get(0),
generateTargetPaths(queryDataSet.getPaths()),
false);
while (insertTabletPlansIterator.hasNext()) {
List<InsertTabletPlan> insertTabletPlans = insertTabletPlansIterator.next();
if (insertTabletPlans.isEmpty()) {
continue;
}
if (!serviceProvider.executeNonQuery(new InsertMultiTabletsPlan(insertTabletPlans))) {
throw new ContinuousQueryException(
String.format(
"failed to execute cq task %s, sql: %s",
continuousQueryPlan.getContinuousQueryName(), sql));
}
}
}
protected List<PartialPath> generateTargetPaths(List<Path> rawPaths) throws IllegalPathException {
List<PartialPath> targetPaths = new ArrayList<>(rawPaths.size());
for (Path rawPath : rawPaths) {
targetPaths.add(new PartialPath(fillTargetPathTemplate((PartialPath) rawPath)));
}
return targetPaths;
}
protected String fillTargetPathTemplate(PartialPath rawPath) throws IllegalPathException {
String fullPath = rawPath.getFullPath();
int indexOfLeftBracket = fullPath.indexOf("(");
if (indexOfLeftBracket != -1) {
fullPath = fullPath.substring(indexOfLeftBracket + 1);
}
int indexOfRightBracket = fullPath.lastIndexOf(")");
if (indexOfRightBracket != -1) {
fullPath = fullPath.substring(0, indexOfRightBracket);
}
String[] nodes = new PartialPath(fullPath).getNodes();
StringBuffer sb = new StringBuffer();
Matcher m =
PATH_NODE_NAME_PATTERN.matcher(this.continuousQueryPlan.getTargetPath().getFullPath());
while (m.find()) {
String param = m.group();
String value = nodes[Integer.parseInt(param.substring(2, param.length() - 1).trim())];
m.appendReplacement(sb, value == null ? "" : value);
}
m.appendTail(sb);
return sb.toString();
}
public void onRejection() {
LOGGER.warn(
"continuous query task {} was rejected, sql: {}",
continuousQueryPlan.getContinuousQueryName(),
generateSQL());
}
}