blob: 8583d358be29342101df1197e481ba98f85ecf2a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.managers.systemview;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.managers.systemview.walker.ScanQueryViewWalker;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager.ScanQueryIterator;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.IgniteSpiCloseableIterator;
import org.apache.ignite.spi.systemview.view.ScanQueryView;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.jetbrains.annotations.NotNull;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SCAN;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
/**
* {@link SystemView} implementation providing data about cache queries.
*
* @see GridCacheQueryManager#queryIterators()
* @see ScanQueryIterator
* @see ScanQueryView
*/
public class ScanQuerySystemView<K, V> extends AbstractSystemView<ScanQueryView> {
/** Scan query system view. */
public static final String SCAN_QRY_SYS_VIEW = metricName("scan", "queries");
/** Scan query system view description. */
public static final String SCAN_QRY_SYS_VIEW_DESC = "Scan queries";
/** Cache data. */
private final Collection<GridCacheContext<K, V>> cctxs;
/**
* @param cctxs Cache data.
*/
public ScanQuerySystemView(Collection<GridCacheContext<K, V>> cctxs) {
super(SCAN_QRY_SYS_VIEW, SCAN_QRY_SYS_VIEW_DESC, new ScanQueryViewWalker());
this.cctxs = cctxs;
}
/** {@inheritDoc} */
@Override public int size() {
int sz = 0;
QueryDataIterator iter = new QueryDataIterator();
//Intentionally don't call `next` method to prevent the creation of unused objects and reduce GC pressure.
while (iter.hasNext())
sz++;
return sz;
}
/** {@inheritDoc} */
@NotNull @Override public Iterator<ScanQueryView> iterator() {
return new QueryDataIterator();
}
/** Class to iterate through all {@link GridCacheQueryManager.ScanQueryIterator}. */
private class QueryDataIterator implements Iterator<ScanQueryView> {
/** Cache contexts iterator. */
private final Iterator<GridCacheContext<K, V>> cctxsIter;
/** Current cache context. */
private GridCacheContext<K, V> cctx;
/** Node requests iterator. */
private Iterator<Map.Entry<UUID, GridCacheQueryManager<K, V>.RequestFutureMap>> nodeQryIter;
/** Local query iterator. */
private Iterator<GridCacheQueryManager.ScanQueryIterator> localQryIter;
/** Current node id. */
private UUID nodeId;
/** Current requests map. */
private GridCacheQueryManager<K, V>.RequestFutureMap reqMap;
/** Request results iterator. */
private Iterator<Map.Entry<Long, GridFutureAdapter<GridCacheQueryManager.QueryResult<K, V>>>> qriesIter;
/** Current query id. */
private long qryId;
/** Current scan iterator. */
private IgniteSpiCloseableIterator<IgniteBiTuple<K, V>> qry;
/** {@code True} if {@link #hasNext()} was executed before the {@link #next()}. */
private boolean hasNextExec;
/** */
public QueryDataIterator() {
cctxsIter = cctxs.iterator();
nodeQryIter = Collections.emptyIterator();
qriesIter = Collections.emptyIterator();
localQryIter = Collections.emptyIterator();
}
/** {@inheritDoc} */
@Override public boolean hasNext() {
hasNextExec = true;
while (!nextScanIter()) {
while (!nodeQryIter.hasNext() && !localQryIter.hasNext()) {
if (!cctxsIter.hasNext())
return false;
cctx = cctxsIter.next();
GridCacheQueryManager<K, V> qryMgr = cctx.queries();
nodeQryIter = qryMgr.queryIterators().entrySet().iterator();
localQryIter = qryMgr.localQueryIterators().iterator();
}
if (nodeQryIter.hasNext()) {
Map.Entry<UUID, GridCacheQueryManager<K, V>.RequestFutureMap> next = nodeQryIter.next();
nodeId = next.getKey();
reqMap = next.getValue();
qriesIter = next.getValue().entrySet().iterator();
}
else {
nodeId = cctx.localNodeId();
reqMap = null;
qriesIter = null;
}
}
return true;
}
/** {@inheritDoc} */
@Override public ScanQueryView next() {
if (!hasNextExec && !hasNext())
throw new NoSuchElementException("No more elements.");
hasNextExec = false;
return new ScanQueryView(nodeId, qryId, reqMap != null && reqMap.isCanceled(qryId), qry);
}
/**
* @return {@code True} if next {@link GridCacheQueryManager.ScanQueryIterator} found.
*/
private boolean nextScanIter() {
try {
while (qriesIter != null && qriesIter.hasNext()) {
Map.Entry<Long, GridFutureAdapter<GridCacheQueryManager.QueryResult<K, V>>> qryRes =
qriesIter.next();
if (!qryRes.getValue().isDone() || qryRes.getValue().get().type() != SCAN)
continue;
qryId = qryRes.getKey();
qry = qryRes.getValue().get().get();
return true;
}
qriesIter = null;
reqMap = null;
if (!localQryIter.hasNext())
return false;
qryId = 0;
qry = localQryIter.next();
nodeId = cctx.localNodeId();
return true;
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
}
}