blob: 86931da55ed8ddc6417810df0464a555f0664add [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.cq.internal;
import org.apache.logging.log4j.Logger;
import org.apache.geode.StatisticDescriptor;
import org.apache.geode.Statistics;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.StatisticsType;
import org.apache.geode.StatisticsTypeFactory;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.internal.DefaultQueryService;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
/**
* This class tracks GemFire statistics related to CqService. Specifically the following statistics
* are tracked: Number of CQs created Number of active CQs Number of CQs suspended or stopped Number
* of CQs closed Number of CQs on a client
*
* @since GemFire 5.5
*/
public class CqServiceVsdStats {
private static final Logger logger = LogService.getLogger();
/** The <code>StatisticsType</code> of the statistics */
private static final StatisticsType _type;
/** Name of the created CQs statistic */
private static final String CQS_CREATED = "numCqsCreated";
/** Name of the active CQs statistic */
private static final String CQS_ACTIVE = "numCqsActive";
/** Name of the stopped CQs statistic */
private static final String CQS_STOPPED = "numCqsStopped";
/** Name of the closed CQs statistic */
private static final String CQS_CLOSED = "numCqsClosed";
/** Name of the client's CQs statistic */
private static final String CQS_ON_CLIENT = "numCqsOnClient";
/** Number of clients with CQs statistic */
private static final String CLIENTS_WITH_CQS = "numClientsWithCqs";
/** CQ query execution time. */
private static final String CQ_QUERY_EXECUTION_TIME = "cqQueryExecutionTime";
/** CQ query execution in progress */
private static final String CQ_QUERY_EXECUTION_IN_PROGRESS = "cqQueryExecutionInProgress";
/** Completed CQ query executions */
private static final String CQ_QUERY_EXECUTIONS_COMPLETED = "cqQueryExecutionsCompleted";
/** Unique CQs, number of different CQ queries */
private static final String UNIQUE_CQ_QUERY = "numUniqueCqQuery";
/** Id of the CQs created statistic */
private static final int _numCqsCreatedId;
/** Id of the active CQs statistic */
private static final int _numCqsActiveId;
/** Id of the stopped CQs statistic */
private static final int _numCqsStoppedId;
/** Id of the closed CQs statistic */
private static final int _numCqsClosedId;
/** Id of the CQs on client statistic */
private static final int _numCqsOnClientId;
/** Id of the Clients with Cqs statistic */
private static final int _numClientsWithCqsId;
/** Id for the CQ query execution time. */
private static final int _cqQueryExecutionTimeId;
/** Id for the CQ query execution in progress */
private static final int _cqQueryExecutionInProgressId;
/** Id for completed CQ query executions */
private static final int _cqQueryExecutionsCompletedId;
/** Id for unique CQs, difference in CQ queries */
private static final int _numUniqueCqQuery;
/*
* Static initializer to create and initialize the <code>StatisticsType</code>
*/
static {
String statName = "CqServiceStats";
StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton();
_type = f.createType(statName, statName,
new StatisticDescriptor[] {
f.createLongCounter(CQS_CREATED, "Number of CQs created.", "operations"),
f.createLongCounter(CQS_ACTIVE, "Number of CQS actively executing.", "operations"),
f.createLongCounter(CQS_STOPPED, "Number of CQs stopped.", "operations"),
f.createLongCounter(CQS_CLOSED, "Number of CQs closed.", "operations"),
f.createLongCounter(CQS_ON_CLIENT, "Number of CQs on the client.", "operations"),
f.createLongCounter(CLIENTS_WITH_CQS, "Number of Clients with CQs.", "operations"),
f.createLongCounter(CQ_QUERY_EXECUTION_TIME, "Time taken for CQ Query Execution.",
"nanoseconds"),
f.createLongCounter(CQ_QUERY_EXECUTIONS_COMPLETED, "Number of CQ Query Executions.",
"operations"),
f.createIntGauge(CQ_QUERY_EXECUTION_IN_PROGRESS, "CQ Query Execution In Progress.",
"operations"),
f.createIntGauge(UNIQUE_CQ_QUERY, "Number of Unique CQ Querys.", "Queries"),
});
// Initialize id fields
_numCqsCreatedId = _type.nameToId(CQS_CREATED);
_numCqsActiveId = _type.nameToId(CQS_ACTIVE);
_numCqsStoppedId = _type.nameToId(CQS_STOPPED);
_numCqsClosedId = _type.nameToId(CQS_CLOSED);
_numCqsOnClientId = _type.nameToId(CQS_ON_CLIENT);
_numClientsWithCqsId = _type.nameToId(CLIENTS_WITH_CQS);
_cqQueryExecutionTimeId = _type.nameToId(CQ_QUERY_EXECUTION_TIME);
_cqQueryExecutionsCompletedId = _type.nameToId(CQ_QUERY_EXECUTIONS_COMPLETED);
_cqQueryExecutionInProgressId = _type.nameToId(CQ_QUERY_EXECUTION_IN_PROGRESS);
_numUniqueCqQuery = _type.nameToId(UNIQUE_CQ_QUERY);
}
/** The <code>Statistics</code> instance to which most behavior is delegated */
private final Statistics _stats;
/**
* Constructor.
*
* @param factory The <code>StatisticsFactory</code> which creates the <code>Statistics</code>
* instance
*/
CqServiceVsdStats(StatisticsFactory factory) {
this._stats = factory.createAtomicStatistics(_type, "CqServiceStats");
}
/**
* Closes the <code>HARegionQueueStats</code>.
*/
public void close() {
this._stats.close();
}
/**
* Returns the current value of the "numCqsCreated" stat.
*
* @return the current value of the "numCqsCreated" stat
*/
long getNumCqsCreated() {
return this._stats.getLong(_numCqsCreatedId);
}
/**
* Increments the "numCqsCreated" stat by 1.
*/
void incCqsCreated() {
this._stats.incLong(_numCqsCreatedId, 1);
}
/**
* Returns the current value of the "numCqsActive" stat.
*
* @return the current value of the "numCqsActive" stat
*/
long getNumCqsActive() {
return this._stats.getLong(_numCqsActiveId);
}
/**
* Increments the "numCqsActive" stat by 1.
*/
void incCqsActive() {
this._stats.incLong(_numCqsActiveId, 1);
}
/**
* Decrements the "numCqsActive" stat by 1.
*/
void decCqsActive() {
this._stats.incLong(_numCqsActiveId, -1);
}
/**
* Returns the current value of the "numCqsStopped" stat.
*
* @return the current value of the "numCqsStopped" stat
*/
long getNumCqsStopped() {
return this._stats.getLong(_numCqsStoppedId);
}
/**
* Increments the "numCqsStopped" stat by 1.
*/
void incCqsStopped() {
this._stats.incLong(_numCqsStoppedId, 1);
}
/**
* Decrements the "numCqsStopped" stat by 1.
*/
void decCqsStopped() {
this._stats.incLong(_numCqsStoppedId, -1);
}
/**
* Returns the current value of the "numCqsClosed" stat.
*
* @return the current value of the "numCqsClosed" stat
*/
long getNumCqsClosed() {
return this._stats.getLong(_numCqsClosedId);
}
/**
* Increments the "numCqsClosed" stat by 1.
*/
void incCqsClosed() {
this._stats.incLong(_numCqsClosedId, 1);
}
/**
* Returns the current value of the "numCqsOnClient" stat.
*
* @return the current value of the "numCqsOnClient" stat
*/
long getNumCqsOnClient() {
return this._stats.getLong(_numCqsOnClientId);
}
/**
* Increments the "numCqsOnClient" stat by 1.
*/
void incCqsOnClient() {
this._stats.incLong(_numCqsOnClientId, 1);
}
/**
* Decrements the "numCqsOnClient" stat by 1.
*/
void decCqsOnClient() {
this._stats.incLong(_numCqsOnClientId, -1);
}
/**
* Returns the current value of the "numClientsWithCqs" stat.
*
* @return the current value of the "numClientsWithCqs" stat
*/
public long getNumClientsWithCqs() {
return this._stats.getLong(_numClientsWithCqsId);
}
/**
* Increments the "numClientsWithCqs" stat by 1.
*/
void incClientsWithCqs() {
this._stats.incLong(_numClientsWithCqsId, 1);
}
/**
* Decrements the "numCqsOnClient" stat by 1.
*/
void decClientsWithCqs() {
this._stats.incLong(_numClientsWithCqsId, -1);
}
/**
* Start the CQ Query Execution time.
*/
long startCqQueryExecution() {
this._stats.incInt(_cqQueryExecutionInProgressId, 1);
return NanoTimer.getTime();
}
/**
* End CQ Query Execution Time.
*
* @param start long time value.
*/
void endCqQueryExecution(long start) {
long ts = NanoTimer.getTime();
this._stats.incLong(_cqQueryExecutionTimeId, ts - start);
this._stats.incInt(_cqQueryExecutionInProgressId, -1);
this._stats.incLong(_cqQueryExecutionsCompletedId, 1);
}
/**
* Returns the total time spent executing the CQ Queries.
*
* @return long time spent.
*/
public long getCqQueryExecutionTime() {
return this._stats.getLong(_cqQueryExecutionTimeId);
}
/**
* Increments number of Unique queries.
*/
void incUniqueCqQuery() {
this._stats.incInt(_numUniqueCqQuery, 1);
}
/**
* Decrements number of unique Queries.
*/
void decUniqueCqQuery() {
this._stats.incInt(_numUniqueCqQuery, -1);
}
/**
* This is a test method. It silently ignores exceptions and should not be used outside of unit
* tests.
* <p>
* Returns the number of CQs (active + suspended) on the given region.
*/
public long numCqsOnRegion(final InternalCache cache, String regionName) {
if (cache == null) {
return 0;
}
DefaultQueryService queryService = (DefaultQueryService) cache.getQueryService();
CqService cqService = null;
try {
cqService = queryService.getCqService();
} catch (CqException e) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get CqService {}", e.getLocalizedMessage());
}
e.printStackTrace();
return -1; // We're confused
}
if (((CqServiceImpl) cqService).isServer()) {
// If we are on the server, look at the number of CQs in the filter profile.
try {
FilterProfile fp = cache.getFilterProfile(regionName);
if (fp == null) {
return 0;
}
return fp.getCqCount();
} catch (Exception ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get serverside CQ count for region: {} {}", regionName,
ex.getLocalizedMessage());
}
}
} else {
try {
CqQuery[] cqs = queryService.getCqs(regionName);
if (cqs != null) {
return cqs.length;
}
} catch (Exception ex) {
// Dont do anything.
}
}
return 0;
}
}