blob: 50e131e8fe12122d2c4449d8561efe27360be626 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.processors.cache.query;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SCAN;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SET;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SPI;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
* Query adapter.
public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** */
private final GridCacheContext<?, ?> cctx;
/** */
private final GridCacheQueryType type;
/** */
private final IgniteLogger log;
/** Class name in case of binary query. */
private final String clsName;
/** */
@GridToStringInclude(sensitive = true)
private final String clause;
/** */
private final IgniteBiPredicate<Object, Object> filter;
/** Limits returned records quantity. */
private int limit;
/** Transformer. */
private IgniteClosure<?, ?> transform;
/** Partition. */
private Integer part;
/** */
private final boolean incMeta;
/** */
private volatile int pageSize = Query.DFLT_PAGE_SIZE;
/** */
private volatile long timeout;
/** */
private volatile boolean incBackups;
/** Local query. */
private boolean forceLocal;
/** */
private volatile boolean dedup;
/** */
private volatile ClusterGroup prj;
/** */
private boolean keepBinary;
/** */
private UUID subjId;
/** */
private int taskHash;
/** */
private MvccSnapshot mvccSnapshot;
/** */
private Boolean dataPageScanEnabled;
* @param cctx Context.
* @param type Query type.
* @param filter Scan filter.
* @param part Partition.
* @param keepBinary Keep binary flag.
* @param forceLocal Flag to force local query.
* @param dataPageScanEnabled Flag to enable data page scan.
public GridCacheQueryAdapter(
GridCacheContext<?, ?> cctx,
GridCacheQueryType type,
@Nullable IgniteBiPredicate<Object, Object> filter,
@Nullable IgniteClosure<Map.Entry, Object> transform,
@Nullable Integer part,
boolean keepBinary,
boolean forceLocal,
Boolean dataPageScanEnabled
) {
assert cctx != null;
assert type != null;
assert part == null || part >= 0;
this.cctx = cctx;
this.type = type;
this.filter = filter;
this.transform = transform;
this.part = part;
this.keepBinary = keepBinary;
this.forceLocal = forceLocal;
this.dataPageScanEnabled = dataPageScanEnabled;
log = cctx.logger(getClass());
this.incMeta = false;
this.clsName = null;
this.clause = null;
* @param cctx Context.
* @param type Query type.
* @param clsName Class name.
* @param clause Clause.
* @param filter Scan filter.
* @param part Partition.
* @param incMeta Include metadata flag.
* @param keepBinary Keep binary flag.
* @param dataPageScanEnabled Flag to enable data page scan.
public GridCacheQueryAdapter(
GridCacheContext<?, ?> cctx,
GridCacheQueryType type,
@Nullable String clsName,
@Nullable String clause,
@Nullable IgniteBiPredicate<Object, Object> filter,
@Nullable Integer part,
boolean incMeta,
boolean keepBinary,
Boolean dataPageScanEnabled
) {
assert cctx != null;
assert type != null;
assert part == null || part >= 0;
this.cctx = cctx;
this.type = type;
this.clsName = clsName;
this.clause = clause;
this.filter = filter;
this.part = part;
this.incMeta = incMeta;
this.keepBinary = keepBinary;
this.dataPageScanEnabled = dataPageScanEnabled;
log = cctx.logger(getClass());
* @param cctx Context.
* @param type Query type.
* @param log Logger.
* @param pageSize Page size.
* @param timeout Timeout.
* @param incBackups Include backups flag.
* @param dedup Enable dedup flag.
* @param prj Grid projection.
* @param filter Key-value filter.
* @param part Partition.
* @param clsName Class name.
* @param clause Clause.
* @param limit Response limit. Set to 0 for no limits.
* @param incMeta Include metadata flag.
* @param keepBinary Keep binary flag.
* @param subjId Security subject ID.
* @param taskHash Task hash.
* @param mvccSnapshot Mvcc version.
* @param dataPageScanEnabled Flag to enable data page scan.
public GridCacheQueryAdapter(
GridCacheContext<?, ?> cctx,
GridCacheQueryType type,
IgniteLogger log,
int pageSize,
long timeout,
boolean incBackups,
boolean dedup,
ClusterGroup prj,
IgniteBiPredicate<Object, Object> filter,
@Nullable Integer part,
@Nullable String clsName,
String clause,
int limit,
boolean incMeta,
boolean keepBinary,
UUID subjId,
int taskHash,
MvccSnapshot mvccSnapshot,
Boolean dataPageScanEnabled
) {
this.cctx = cctx;
this.type = type;
this.log = log;
this.pageSize = pageSize;
this.timeout = timeout;
this.incBackups = incBackups;
this.dedup = dedup;
this.prj = prj;
this.filter = filter;
this.part = part;
this.clsName = clsName;
this.clause = clause;
this.limit = limit;
this.incMeta = incMeta;
this.keepBinary = keepBinary;
this.subjId = subjId;
this.taskHash = taskHash;
this.mvccSnapshot = mvccSnapshot;
this.dataPageScanEnabled = dataPageScanEnabled;
* @return Flag to enable data page scan.
public Boolean isDataPageScanEnabled() {
return dataPageScanEnabled;
* @return MVCC snapshot.
@Nullable MvccSnapshot mvccSnapshot() {
return mvccSnapshot;
* @return Type.
public GridCacheQueryType type() {
return type;
* @return Class name.
@Nullable public String queryClassName() {
return clsName;
* @return Clause.
@Nullable public String clause() {
return clause;
* @return Include metadata flag.
public boolean includeMetadata() {
return incMeta;
* @return {@code True} if binary should not be deserialized.
public boolean keepBinary() {
return keepBinary;
* Forces query to keep binary object representation even if query was created on plain projection.
* @param keepBinary Keep binary flag.
public void keepBinary(boolean keepBinary) {
this.keepBinary = keepBinary;
* @return {@code True} if the query is forced local.
public boolean forceLocal() {
return forceLocal;
* @return Security subject ID.
public UUID subjectId() {
return subjId;
* @return Task hash.
public int taskHash() {
return taskHash;
* @param subjId Security subject ID.
public void subjectId(UUID subjId) {
this.subjId = subjId;
/** {@inheritDoc} */
@Override public CacheQuery<T> pageSize(int pageSize) {
A.ensure(pageSize > 0, "pageSize > 0");
this.pageSize = pageSize;
return this;
* @return Page size.
public int pageSize() {
return pageSize;
/** {@inheritDoc} */
@Override public CacheQuery<T> timeout(long timeout) {
A.ensure(timeout >= 0, "timeout >= 0");
this.timeout = timeout;
return this;
* @return Response limit. Returns 0 for no limits.
public int limit() {
return limit;
/** {@inheritDoc} */
@Override public CacheQuery<T> limit(int limit) {
this.limit = limit;
return this;
* @return Timeout.
public long timeout() {
return timeout;
/** {@inheritDoc} */
@Override public CacheQuery<T> includeBackups(boolean incBackups) {
this.incBackups = incBackups;
return this;
* @return Include backups.
public boolean includeBackups() {
return incBackups;
/** {@inheritDoc} */
@Override public CacheQuery<T> enableDedup(boolean dedup) {
this.dedup = dedup;
return this;
* @return Enable dedup flag.
public boolean enableDedup() {
return dedup;
/** {@inheritDoc} */
@Override public CacheQuery<T> projection(ClusterGroup prj) {
this.prj = prj;
return this;
* @return Grid projection.
public ClusterGroup projection() {
return prj;
* @return Key-value filter.
@Nullable public <K, V> IgniteBiPredicate<K, V> scanFilter() {
return (IgniteBiPredicate<K, V>)filter;
* @return Transformer.
@Nullable public <K, V> IgniteClosure<Map.Entry<K, V>, Object> transform() {
return (IgniteClosure<Map.Entry<K, V>, Object>)transform;
* @return Partition.
@Nullable public Integer partition() {
return part;
* @throws IgniteCheckedException If query is invalid.
public void validate() throws IgniteCheckedException {
if ((type != SCAN && type != SET && type != SPI) && !QueryUtils.isEnabled(cctx.config()))
throw new IgniteCheckedException("Indexing is disabled for cache: " + cctx.cache().name());
/** {@inheritDoc} */
@Override public CacheQueryFuture<T> execute(@Nullable Object... args) {
return execute0(null, args);
/** {@inheritDoc} */
@Override public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, @Nullable Object... args) {
return execute0(rmtReducer, args);
* @param rmtReducer Optional reducer.
* @param args Arguments.
* @return Future.
private <R> CacheQueryFuture<R> execute0(@Nullable IgniteReducer<T, R> rmtReducer, @Nullable Object... args) {
assert type != SCAN : this;
Collection<ClusterNode> nodes;
try {
nodes = nodes();
catch (IgniteCheckedException e) {
return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
if (nodes.isEmpty())
return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new ClusterGroupEmptyCheckedException());
if (log.isDebugEnabled())
log.debug("Executing query [query=" + this + ", nodes=" + nodes + ']');
if (cctx.deploymentEnabled()) {
try {
cctx.deploy().registerClasses(filter, rmtReducer);
catch (IgniteCheckedException e) {
return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
if (subjId == null)
subjId = cctx.localNodeId();
taskHash = cctx.kernalContext().job().currentTaskNameHash();
final GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer,
null, args);
final GridCacheQueryManager qryMgr = cctx.queries();
boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId());
if (type == SQL_FIELDS || type == SPI)
return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
qryMgr.queryFieldsDistributed(bean, nodes));
return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
/** {@inheritDoc} */
@Override public GridCloseableIterator executeScanQuery() throws IgniteCheckedException {
assert type == SCAN : "Wrong processing of query: " + type;
if (!cctx.isLocal()) {
GridDhtCacheAdapter<?, ?> cacheAdapter = cctx.isNear() ? cctx.near().dht() : cctx.dht();
Set<Integer> lostParts = cacheAdapter.topology().lostPartitions();
if (!lostParts.isEmpty()) {
if (part == null || lostParts.contains(part)) {
throw new CacheException(new CacheInvalidStateException("Failed to execute query because cache partition " +
"has been lostParts [cacheName=" + +
", part=" + (part == null ? lostParts.iterator().next() : part) + ']'));
// Affinity nodes snapshot.
Collection<ClusterNode> nodes = new ArrayList<>(nodes());
if (nodes.isEmpty()) {
if (part != null) {
if (forceLocal) {
throw new IgniteCheckedException("No queryable nodes for partition " + part
+ " [forced local query=" + this + "]");
return new GridEmptyCloseableIterator();
if (log.isDebugEnabled())
log.debug("Executing query [query=" + this + ", nodes=" + nodes + ']');
if (cctx.deploymentEnabled())
if (subjId == null)
subjId = cctx.localNodeId();
taskHash = cctx.kernalContext().job().currentTaskNameHash();
final GridCacheQueryManager qryMgr = cctx.queries();
MvccQueryTracker mvccTracker = null;
if (cctx.mvccEnabled() && mvccSnapshot == null) {
GridNearTxLocal tx =;
if (tx != null)
mvccSnapshot = MvccUtils.requestSnapshot(tx);
else {
mvccTracker = MvccUtils.mvccTracker(cctx, null);
mvccSnapshot = mvccTracker.snapshot();
assert mvccSnapshot != null;
boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId());
GridCloseableIterator it;
if (loc)
it = qryMgr.scanQueryLocal(this, true);
else if (part != null)
it = new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx);
it = qryMgr.scanQueryDistributed(this, nodes);
return mvccTracker != null ? new MvccTrackingIterator(it, mvccTracker) : it;
* @return Nodes to execute on.
private Collection<ClusterNode> nodes() throws IgniteCheckedException {
CacheMode cacheMode = cctx.config().getCacheMode();
Integer part = partition();
switch (cacheMode) {
case LOCAL:
if (prj != null)
U.warn(log, "Ignoring query projection because it's executed over LOCAL cache " +
"(only local node will be queried): " + this);
if (type == SCAN && cctx.config().getCacheMode() == LOCAL &&
part != null && part >= cctx.affinity().partitions())
throw new IgniteCheckedException("Invalid partition number: " + part);
return Collections.singletonList(cctx.localNode());
if (prj != null || part != null)
return nodes(cctx, prj, part);
GridDhtPartitionTopology topology = cctx.topology();
if (cctx.affinityNode() && !topology.localPartitionMap().hasMovingPartitions())
return Collections.singletonList(cctx.localNode());
try {
Collection<ClusterNode> affNodes = nodes(cctx, null, null);
List<ClusterNode> nodes = new ArrayList<>(affNodes);
for (ClusterNode node : nodes) {
if (!topology.partitions(
return Collections.singletonList(node);
return affNodes;
finally {
return nodes(cctx, prj, part);
throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode);
* @param cctx Cache context.
* @param prj Projection (optional).
* @return Collection of data nodes in provided projection (if any).
* @throws IgniteCheckedException If partition number is invalid.
private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,
@Nullable final ClusterGroup prj, @Nullable final Integer part) throws IgniteCheckedException {
assert cctx != null;
final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
if (prj == null && part == null)
return affNodes;
if (part != null && part >= cctx.affinity().partitions())
throw new IgniteCheckedException("Invalid partition number: " + part);
final Set<ClusterNode> owners =
part == null ? Collections.<ClusterNode>emptySet() : new HashSet<>(cctx.topology().owners(part, topVer));
return F.view(affNodes, new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
return cctx.discovery().cacheAffinityNode(n, &&
(prj == null || prj.node( != null) &&
(part == null || owners.contains(n));
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheQueryAdapter.class, this);
* Wrapper for queries with fallback.
private static class ScanQueryFallbackClosableIterator extends GridCloseableIteratorAdapter {
/** */
private static final long serialVersionUID = 0L;
/** Query future. */
private volatile T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> tuple;
/** Backups. */
private volatile Queue<ClusterNode> nodes;
/** Topology version of the last detected {@link GridDhtUnreservedPartitionException}. */
private volatile AffinityTopologyVersion unreservedTopVer;
/** Number of times to retry the query on the nodes failed with {@link GridDhtUnreservedPartitionException}. */
private volatile int unreservedNodesRetryCnt = 5;
/** Bean. */
private final GridCacheQueryAdapter qry;
/** Query manager. */
private final GridCacheQueryManager qryMgr;
/** Cache context. */
private final GridCacheContext cctx;
/** Partition. */
private final int part;
/** Flag indicating that a first item has been returned to a user. */
private boolean firstItemReturned;
/** */
private Object cur;
* @param part Partition.
* @param qry Query.
* @param qryMgr Query manager.
* @param cctx Cache context.
private ScanQueryFallbackClosableIterator(int part, GridCacheQueryAdapter qry,
GridCacheQueryManager qryMgr, GridCacheContext cctx) {
this.qry = qry;
this.qryMgr = qryMgr;
this.cctx = cctx;
this.part = part;
nodes = fallbacks(cctx.shared().exchange().readyAffinityVersion());
if (F.isEmpty(nodes))
throw new ClusterTopologyException("Failed to execute the query " +
"(all affinity nodes left the grid) [cache=" + +
", qry=" + qry +
", curTopVer=" + qryMgr.queryTopologyVersion().topologyVersion() + ']');
* @param topVer Topology version.
* @return Nodes for query execution.
private Queue<ClusterNode> fallbacks(AffinityTopologyVersion topVer) {
Deque<ClusterNode> fallbacks = new LinkedList<>();
Collection<ClusterNode> owners = new HashSet<>();
for (ClusterNode node : cctx.topology().owners(part, topVer)) {
if (node.isLocal())
for (ClusterNode node : cctx.topology().moving(part)) {
if (!owners.contains(node))
return fallbacks;
private void init() {
final ClusterNode node = nodes.poll();
if (node.isLocal()) {
try {
GridCloseableIterator it = qryMgr.scanQueryLocal(qry, true);
tuple = new T2(it, null);
catch (IgniteClientDisconnectedCheckedException e) {
throw CU.convertToCacheException(e);
catch (IgniteCheckedException e) {
else {
final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, qry.transform, null);
GridCacheQueryFutureAdapter fut =
(GridCacheQueryFutureAdapter)qryMgr.queryDistributed(bean, Collections.singleton(node));
tuple = new T2(null, fut);
/** {@inheritDoc} */
@Override protected Object onNext() throws IgniteCheckedException {
if (!onHasNext())
throw new NoSuchElementException();
assert cur != null;
Object e = cur;
cur = null;
return e;
/** {@inheritDoc} */
@Override protected boolean onHasNext() throws IgniteCheckedException {
while (true) {
if (cur != null)
return true;
T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> t = tuple;
GridCloseableIterator<Object> iter = t.get1();
if (iter != null) {
boolean hasNext = iter.hasNext();
if (hasNext)
cur =;
return hasNext;
else {
GridCacheQueryFutureAdapter fut = t.get2();
assert fut != null;
if (firstItemReturned)
return (cur = convert( != null;
try {
firstItemReturned = true;
return (cur = convert( != null;
catch (IgniteClientDisconnectedCheckedException e) {
throw CU.convertToCacheException(e);
catch (IgniteCheckedException e) {
* @param obj Entry to convert.
* @return Cache entry
private Object convert(Object obj) {
if (qry.transform() != null)
return obj;
Map.Entry e = (Map.Entry)obj;
return e == null ? null : new CacheQueryEntry(e.getKey(), e.getValue());
* @param e Exception for query run.
private void retryIfPossible(IgniteCheckedException e) {
try {
IgniteInternalFuture<?> retryFut;
GridDhtUnreservedPartitionException partErr = X.cause(e, GridDhtUnreservedPartitionException.class);
if (partErr != null) {
AffinityTopologyVersion waitVer = partErr.topologyVersion();
assert waitVer != null;
retryFut = cctx.shared().exchange().affinityReadyFuture(waitVer);
else if (e.hasCause(ClusterTopologyCheckedException.class)) {
ClusterTopologyCheckedException topEx = X.cause(e, ClusterTopologyCheckedException.class);
retryFut = topEx.retryReadyFuture();
else if (e.hasCause(ClusterGroupEmptyCheckedException.class)) {
ClusterGroupEmptyCheckedException ex = X.cause(e, ClusterGroupEmptyCheckedException.class);
retryFut = ex.retryReadyFuture();
throw CU.convertToCacheException(e);
if (F.isEmpty(nodes)) {
if (--unreservedNodesRetryCnt > 0) {
if (retryFut != null)
nodes = fallbacks(unreservedTopVer == null ? cctx.shared().exchange().readyAffinityVersion() : unreservedTopVer);
unreservedTopVer = null;
throw CU.convertToCacheException(e);
catch (IgniteCheckedException ex) {
throw CU.convertToCacheException(ex);
/** {@inheritDoc} */
@Override protected void onClose() throws IgniteCheckedException {
T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> t = tuple;
if (t != null && t.get1() != null)
if (t != null && t.get2() != null)
* Wrapper for an MVCC-related iterators.
private static class MvccTrackingIterator implements GridCloseableIterator {
/** Serial version uid. */
private static final long serialVersionUID = -1905248502802333832L;
/** Underlying iterator. */
private final GridCloseableIterator it;
/** Query MVCC tracker. */
private final MvccQueryTracker mvccTracker;
* Constructor.
* @param it Underlying iterator.
* @param mvccTracker Query MVCC tracker.
MvccTrackingIterator(GridCloseableIterator it, MvccQueryTracker mvccTracker) {
assert it != null && mvccTracker != null; = it;
this.mvccTracker = mvccTracker;
/** {@inheritDoc} */
@Override public void close() throws IgniteCheckedException {
if (isClosed())
try {
finally {
/** {@inheritDoc} */
@Override public boolean isClosed() {
return it.isClosed();
/** {@inheritDoc} */
@Override public boolean hasNext() {
boolean hasNext = it.hasNext();
if (!hasNext)
try {
catch (IgniteCheckedException e) {
throw new IgniteException(e);
return hasNext;
/** {@inheritDoc} */
@Override public boolean hasNextX() throws IgniteCheckedException {
boolean hasNext = it.hasNext();
if (!hasNext)
return hasNext;
/** {@inheritDoc} */
@Override public Object nextX() throws IgniteCheckedException {
return it.nextX();
/** {@inheritDoc} */
@Override public void removeX() throws IgniteCheckedException {
/** {@inheritDoc} */
@NotNull @Override public Iterator iterator() {
return this;
/** {@inheritDoc} */
@Override public Object next() {