blob: 9ba793803bed5992376f32bdd74cc8f04c3f04c6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.systemview.walker.SqlQueryHistoryViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.SqlQueryViewWalker;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.systemview.view.SqlQueryHistoryView;
import org.apache.ignite.spi.systemview.view.SqlQueryView;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
/**
* Keep information about all running queries.
*/
public class RunningQueryManager {
/** Name of the MetricRegistry which metrics measure stats of queries initiated by user. */
public static final String SQL_USER_QUERIES_REG_NAME = "sql.queries.user";
/** */
public static final String SQL_QRY_VIEW = metricName("sql", "queries");
/** */
public static final String SQL_QRY_VIEW_DESC = "Running SQL queries.";
/** */
public static final String SQL_QRY_HIST_VIEW = metricName("sql", "queries", "history");
/** */
public static final String SQL_QRY_HIST_VIEW_DESC = "SQL queries history.";
/** Keep registered user queries. */
private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap<>();
/** Unique id for queries on single node. */
private final AtomicLong qryIdGen = new AtomicLong();
/** Local node ID. */
private final UUID localNodeId;
/** History size. */
private final int histSz;
/** Query history tracker. */
private volatile QueryHistoryTracker qryHistTracker;
/** Number of successfully executed queries. */
private final LongAdderMetric successQrsCnt;
/** Number of failed queries in total by any reason. */
private final AtomicLongMetric failedQrsCnt;
/**
* Number of canceled queries. Canceled queries a treated as failed and counting twice: here and in {@link
* #failedQrsCnt}.
*/
private final AtomicLongMetric canceledQrsCnt;
/**
* Constructor.
*
* @param ctx Context.
*/
public RunningQueryManager(GridKernalContext ctx) {
localNodeId = ctx.localNodeId();
histSz = ctx.config().getSqlConfiguration().getSqlQueryHistorySize();
qryHistTracker = new QueryHistoryTracker(histSz);
ctx.systemView().registerView(SQL_QRY_VIEW, SQL_QRY_VIEW_DESC,
new SqlQueryViewWalker(),
runs.values(),
SqlQueryView::new);
ctx.systemView().registerView(SQL_QRY_HIST_VIEW, SQL_QRY_HIST_VIEW_DESC,
new SqlQueryHistoryViewWalker(),
qryHistTracker.queryHistory().values(),
SqlQueryHistoryView::new);
MetricRegistry userMetrics = ctx.metric().registry(SQL_USER_QUERIES_REG_NAME);
successQrsCnt = userMetrics.longAdderMetric("success",
"Number of successfully executed user queries that have been started on this node.");
failedQrsCnt = userMetrics.longMetric("failed", "Total number of failed by any reason (cancel, etc)" +
" queries that have been started on this node.");
canceledQrsCnt = userMetrics.longMetric("canceled", "Number of canceled queries that have been started " +
"on this node. This metric number included in the general 'failed' metric.");
}
/**
* Register running query.
*
* @param qry Query text.
* @param qryType Query type.
* @param schemaName Schema name.
* @param loc Local query flag.
* @param cancel Query cancel. Should be passed in case query is cancelable, or {@code null} otherwise.
* @return Id of registered query.
*/
public Long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc,
@Nullable GridQueryCancel cancel) {
long qryId = qryIdGen.incrementAndGet();
GridRunningQueryInfo run = new GridRunningQueryInfo(
qryId,
localNodeId,
qry,
qryType,
schemaName,
System.currentTimeMillis(),
cancel,
loc
);
GridRunningQueryInfo preRun = runs.putIfAbsent(qryId, run);
assert preRun == null : "Running query already registered [prev_qry=" + preRun + ", newQry=" + run + ']';
return qryId;
}
/**
* Unregister running query.
*
* @param qryId id of the query, which is given by {@link #register register} method.
* @param failReason exception that caused query execution fail, or {@code null} if query succeded.
*/
public void unregister(Long qryId, @Nullable Throwable failReason) {
if (qryId == null)
return;
boolean failed = failReason != null;
GridRunningQueryInfo qry = runs.remove(qryId);
// Attempt to unregister query twice.
if (qry == null)
return;
//We need to collect query history and metrics only for SQL queries.
if (isSqlQuery(qry)) {
qry.runningFuture().onDone();
qryHistTracker.collectHistory(qry, failed);
if (!failed)
successQrsCnt.increment();
else {
failedQrsCnt.increment();
// We measure cancel metric as "number of times user's queries ended up with query cancelled exception",
// not "how many user's KILL QUERY command succeeded". These may be not the same if cancel was issued
// right when query failed due to some other reason.
if (QueryUtils.wasCancelled(failReason))
canceledQrsCnt.increment();
}
}
}
/**
* Return SQL queries which executing right now.
*
* @return List of SQL running queries.
*/
public List<GridRunningQueryInfo> runningSqlQueries() {
List<GridRunningQueryInfo> res = new ArrayList<>();
for (GridRunningQueryInfo run : runs.values()) {
if (isSqlQuery(run))
res.add(run);
}
return res;
}
/**
* Check belongs running query to an SQL type.
*
* @param runningQryInfo Running query info object.
* @return {@code true} For SQL or SQL_FIELDS query type.
*/
private boolean isSqlQuery(GridRunningQueryInfo runningQryInfo) {
return runningQryInfo.queryType() == SQL_FIELDS || runningQryInfo.queryType() == SQL;
}
/**
* Return long running user queries.
*
* @param duration Duration of long query.
* @return Collection of queries which running longer than given duration.
*/
public Collection<GridRunningQueryInfo> longRunningQueries(long duration) {
Collection<GridRunningQueryInfo> res = new ArrayList<>();
long curTime = System.currentTimeMillis();
for (GridRunningQueryInfo runningQryInfo : runs.values()) {
if (runningQryInfo.longQuery(curTime, duration))
res.add(runningQryInfo);
}
return res;
}
/**
* Cancel query.
*
* @param qryId Query id.
*/
public void cancel(Long qryId) {
GridRunningQueryInfo run = runs.get(qryId);
if (run != null)
run.cancel();
}
/**
* Cancel all executing queries and deregistering all of them.
*/
public void stop() {
Iterator<GridRunningQueryInfo> iter = runs.values().iterator();
while (iter.hasNext()) {
try {
GridRunningQueryInfo r = iter.next();
iter.remove();
r.cancel();
}
catch (Exception ignore) {
// No-op.
}
}
}
/**
* Gets query history statistics. Size of history could be configured via {@link
* SqlConfiguration#setSqlQueryHistorySize(int)}
*
* @return Queries history statistics aggregated by query text, schema and local flag.
*/
public Map<QueryHistoryKey, QueryHistory> queryHistoryMetrics() {
return qryHistTracker.queryHistory();
}
/**
* Gets info about running query by their id.
* @param qryId
* @return Running query info or {@code null} in case no running query for given id.
*/
public @Nullable GridRunningQueryInfo runningQueryInfo(Long qryId) {
return runs.get(qryId);
}
/**
* Reset query history.
*/
public void resetQueryHistoryMetrics() {
qryHistTracker = new QueryHistoryTracker(histSz);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(RunningQueryManager.class, this);
}
}