blob: 64d6b9a5aa42cf2ad1c83521a0640700bf7c90dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.query;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.sql.SQLException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.QueryIndexType;
import org.apache.ignite.cache.query.IndexQuery;
import org.apache.ignite.cache.query.QueryMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cache.query.index.IndexQueryResult;
import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.metric.IoStatisticsQueryHelper;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate;
import org.apache.ignite.internal.processors.datastructures.SetItemKey;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridBoundedPriorityQueue;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.GridSpiCloseableIteratorWrapper;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.IgniteSpiCloseableIterator;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.apache.ignite.spi.indexing.IndexingSpi;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_QUIET;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SCAN;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SPI;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
import static org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
import static org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
/**
* Query and index manager.
*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapter<K, V> {
/** Maximum number of query detail metrics to evict at once. */
private static final int QRY_DETAIL_METRICS_EVICTION_LIMIT = 10_000;
/** Support 'not null' field constraint since v 2.3.0. */
private static final IgniteProductVersion NOT_NULLS_SUPPORT_VER = IgniteProductVersion.fromString("2.3.0");
/** Comparator for priority queue with query detail metrics with priority to new metrics. */
private static final Comparator<GridCacheQueryDetailMetricsAdapter> QRY_DETAIL_METRICS_PRIORITY_NEW_CMP =
new Comparator<GridCacheQueryDetailMetricsAdapter>() {
@Override public int compare(GridCacheQueryDetailMetricsAdapter m1, GridCacheQueryDetailMetricsAdapter m2) {
return Long.compare(m1.lastStartTime(), m2.lastStartTime());
}
};
/** Comparator for priority queue with query detail metrics with priority to old metrics. */
private static final Comparator<GridCacheQueryDetailMetricsAdapter> QRY_DETAIL_METRICS_PRIORITY_OLD_CMP =
new Comparator<GridCacheQueryDetailMetricsAdapter>() {
@Override public int compare(GridCacheQueryDetailMetricsAdapter m1, GridCacheQueryDetailMetricsAdapter m2) {
return Long.compare(m2.lastStartTime(), m1.lastStartTime());
}
};
/** Function to merge query detail metrics. */
private static final BiFunction<
GridCacheQueryDetailMetricsAdapter,
GridCacheQueryDetailMetricsAdapter,
GridCacheQueryDetailMetricsAdapter>
QRY_DETAIL_METRICS_MERGE_FX = GridCacheQueryDetailMetricsAdapter::aggregate;
/** */
private final boolean isIndexingSpiAllowsBinary =
!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI);
/** */
private GridQueryProcessor qryProc;
/** */
private String cacheName;
/** */
private int maxIterCnt;
/** */
private volatile GridCacheQueryMetricsAdapter metrics;
/** */
private int detailMetricsSz;
/** */
private ConcurrentHashMap<GridCacheQueryDetailMetricsKey, GridCacheQueryDetailMetricsAdapter> detailMetrics;
/** */
private final ConcurrentMap<UUID, RequestFutureMap> qryIters = new ConcurrentHashMap<>();
/** Local query iterators. */
private final GridConcurrentHashSet<ScanQueryIterator> locIters = new GridConcurrentHashSet<>();
/** */
private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<FieldsResult>>> fieldsQryRes =
new ConcurrentHashMap<>();
/** */
private volatile ConcurrentMap<Object, CachedResult<?>> qryResCache = new ConcurrentHashMap<>();
/** */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
/** Event listener. */
private GridLocalEventListener lsnr;
/** */
private volatile boolean enabled;
/** */
private volatile boolean qryProcEnabled;
/** */
private AffinityTopologyVersion qryTopVer;
/** {@inheritDoc} */
@Override public void start0() throws IgniteCheckedException {
CacheConfiguration ccfg = cctx.config();
qryProcEnabled = QueryUtils.isEnabled(ccfg);
qryProc = cctx.kernalContext().query();
cacheName = cctx.name();
enabled = qryProcEnabled || (isIndexingSpiEnabled() && !CU.isSystemCache(cacheName));
maxIterCnt = ccfg.getMaxQueryIteratorsCount();
detailMetricsSz = ccfg.getQueryDetailMetricsSize();
if (detailMetricsSz > 0)
detailMetrics = new ConcurrentHashMap<>(detailMetricsSz);
lsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
Map<Long, GridFutureAdapter<QueryResult<K, V>>> futs = qryIters.remove(nodeId);
if (futs != null) {
for (Map.Entry<Long, GridFutureAdapter<QueryResult<K, V>>> entry : futs.entrySet()) {
final Object rcpt = recipient(nodeId, entry.getKey());
entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() {
@Override public void applyx(IgniteInternalFuture<QueryResult<K, V>> f)
throws IgniteCheckedException {
f.get().closeIfNotShared(rcpt);
}
});
}
}
Map<Long, GridFutureAdapter<FieldsResult>> fieldsFuts = fieldsQryRes.remove(nodeId);
if (fieldsFuts != null) {
for (Map.Entry<Long, GridFutureAdapter<FieldsResult>> entry : fieldsFuts.entrySet()) {
final Object rcpt = recipient(nodeId, entry.getKey());
entry.getValue().listen(new CIX1<IgniteInternalFuture<FieldsResult>>() {
@Override public void applyx(IgniteInternalFuture<FieldsResult> f)
throws IgniteCheckedException {
f.get().closeIfNotShared(rcpt);
}
});
}
}
}
};
metrics = new GridCacheQueryMetricsAdapter(cctx.kernalContext().metric(), cctx.name(), cctx.isNear());
cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
qryTopVer = cctx.startTopologyVersion();
assert qryTopVer != null : cctx.name();
}
/**
* @return {@code True} if indexing is enabled for cache.
*/
public boolean enabled() {
return enabled;
}
/**
* Enable query manager.
*/
public void enable() {
qryProcEnabled = true;
enabled = true;
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
busyLock.block();
cctx.events().removeListener(lsnr);
onCancelAtStop();
}
/**
* @return {@code True} if entered busy state.
*/
private boolean enterBusy() {
return busyLock.enterBusy();
}
/**
* Leaves busy state.
*/
private void leaveBusy() {
busyLock.leaveBusy();
}
/**
* Stops query manager.
*
* @param cancel Cancel queries.
* @param destroy Cache destroy flag..
*/
@Override public final void stop0(boolean cancel, boolean destroy) {
if (log.isDebugEnabled())
log.debug("Stopped cache query manager.");
}
/**
* Marks this request as canceled.
*
* @param reqId Request id.
*/
void onQueryFutureCanceled(long reqId) {
// No-op.
}
/**
* Cancel flag handler at stop.
*/
void onCancelAtStop() {
// No-op.
}
/**
* Processes cache query request.
*
* @param sndId Sender node id.
* @param req Query request.
*/
void processQueryRequest(UUID sndId, GridCacheQueryRequest req) {
// No-op.
}
/**
* Checks if IndexinSPI is enabled.
*
* @return IndexingSPI enabled flag.
*/
private boolean isIndexingSpiEnabled() {
return cctx.kernalContext().indexing().enabled();
}
/**
*
*/
private void invalidateResultCache() {
if (!qryResCache.isEmpty())
qryResCache = new ConcurrentHashMap<>();
}
/**
* @param newRow New row.
* @param prevRow Previous row.
* @param prevRowAvailable Whether previous row is available.
* @throws IgniteCheckedException In case of error.
*/
public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow,
boolean prevRowAvailable) throws IgniteCheckedException {
assert enabled();
assert newRow != null && newRow.value() != null && newRow.link() != 0 : newRow;
if (!enterBusy())
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
if (isIndexingSpiEnabled()) {
CacheObjectContext coctx = cctx.cacheObjectContext();
Object key0 = unwrapIfNeeded(newRow.key(), coctx);
Object val0 = unwrapIfNeeded(newRow.value(), coctx);
cctx.kernalContext().indexing().store(cacheName, key0, val0, newRow.expireTime());
}
if (qryProcEnabled)
qryProc.store(cctx, newRow, prevRow, prevRowAvailable);
}
finally {
invalidateResultCache();
leaveBusy();
}
}
/**
* @param key Key.
* @param prevRow Previous row.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow)
throws IgniteCheckedException {
if (!qryProcEnabled)
return; // No-op.
if (!enterBusy())
return; // Ignore index update when node is stopping.
try {
if (isIndexingSpiEnabled()) {
Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext());
cctx.kernalContext().indexing().remove(cacheName, key0);
}
// val may be null if we have no previous value. We should not call processor in this case.
if (qryProcEnabled && prevRow != null)
qryProc.remove(cctx, prevRow);
}
finally {
invalidateResultCache();
leaveBusy();
}
}
/**
* Executes local query.
*
* @param qry Query.
* @return Query future.
*/
@SuppressWarnings("unchecked")
CacheQueryFuture<?> queryLocal(GridCacheQueryBean qry) {
assert qry.query().type() != GridCacheQueryType.SCAN : qry;
if (log.isDebugEnabled())
log.debug("Executing query on local node: " + qry);
GridCacheLocalQueryFuture fut = new GridCacheLocalQueryFuture<>(cctx, qry);
try {
qry.query().validate();
fut.execute();
}
catch (IgniteCheckedException e) {
if (fut != null)
fut.onDone(e);
}
return fut;
}
/**
* Executes distributed query.
*
* @param qry Query.
* @param nodes Nodes.
* @return Query future.
*/
public abstract CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes);
/**
* Executes distributed SCAN query.
*
* @param qry Query.
* @param nodes Nodes.
* @return Iterator.
* @throws IgniteCheckedException If failed.
*/
public abstract GridCloseableIterator scanQueryDistributed(GridCacheQueryAdapter qry,
Collection<ClusterNode> nodes) throws IgniteCheckedException;
/**
* Executes distributed fields query.
*
* @param qry Query.
* @return Query future.
*/
public abstract CacheQueryFuture<?> queryFieldsLocal(GridCacheQueryBean qry);
/**
* Executes distributed fields query.
*
* @param qry Query.
* @param nodes Nodes.
* @return Query future.
*/
public abstract CacheQueryFuture<?> queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes);
/**
* Unwrap CacheObject if needed.
*/
private Object unwrapIfNeeded(CacheObject obj, CacheObjectContext coctx) {
return isIndexingSpiAllowsBinary && cctx.cacheObjects().isBinaryObject(obj) ? obj : obj.value(coctx, false);
}
/**
* Performs query.
*
* @param qry Query.
* @param args Arguments.
* @param loc Local query or not.
* @param taskName Task name.
* @param rcpt ID of the recipient.
* @return Collection of found keys.
* @throws IgniteCheckedException In case of error.
*/
@SuppressWarnings("unchecked")
private QueryResult<K, V> executeQuery(GridCacheQueryAdapter<?> qry, @Nullable Object[] args,
IgniteClosure transformer, boolean loc, @Nullable String taskName, Object rcpt)
throws IgniteCheckedException {
if (qry.type() == null) {
assert !loc;
throw new IgniteCheckedException("Received next page request after iterator was removed. " +
"Consider increasing maximum number of stored iterators (see " +
"CacheConfiguration.getMaxQueryIteratorsCount() configuration property).");
}
QueryResult<K, V> res;
T3<String, String, List<Object>> resKey = null;
if (qry.type() == SQL) {
resKey = new T3<>(qry.queryClassName(), qry.clause(), F.asList(args));
res = (QueryResult<K, V>)qryResCache.get(resKey);
if (res != null && res.addRecipient(rcpt))
return res;
res = new QueryResult<>(qry.type(), rcpt);
if (qryResCache.putIfAbsent(resKey, res) != null)
resKey = null;
}
else
res = new QueryResult<>(qry.type(), rcpt);
GridCloseableIterator<IgniteBiTuple<K, V>> iter;
try {
switch (qry.type()) {
case SQL:
throw new IllegalStateException("Should never be called.");
case SCAN:
if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
cctx.localNode(),
"Scan query executed.",
EVT_CACHE_QUERY_EXECUTED,
CacheQueryType.SCAN.name(),
cctx.name(),
null,
null,
qry.scanFilter(),
null,
null,
securitySubjectId(cctx),
taskName));
}
iter = scanIterator(qry, transformer, false);
break;
case TEXT:
if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
cctx.localNode(),
"Full text query executed.",
EVT_CACHE_QUERY_EXECUTED,
CacheQueryType.FULL_TEXT.name(),
cctx.name(),
qry.queryClassName(),
qry.clause(),
null,
null,
null,
securitySubjectId(cctx),
taskName));
}
iter = qryProc.queryText(cacheName, qry.clause(), qry.queryClassName(), filter(qry), qry.limit());
break;
case SET:
iter = sharedCacheSetIterator(qry);
break;
case INDEX:
int[] parts = null;
if (qry.partition() != null)
parts = new int[]{qry.partition()};
IndexQueryResult<K, V> idxQryRes = qryProc.queryIndex(cacheName, qry.queryClassName(), qry.idxQryDesc(),
qry.scanFilter(), filter(qry, parts, parts != null), qry.keepBinary());
iter = idxQryRes.iter();
res.metadata(idxQryRes.metadata());
break;
case SQL_FIELDS:
assert false : "SQL fields query is incorrectly processed.";
default:
throw new IgniteCheckedException("Unknown query type: " + qry.type());
}
res.onDone(iter);
}
catch (Exception e) {
res.onDone(e);
}
finally {
if (resKey != null)
qryResCache.remove(resKey, res);
}
return res;
}
/**
* Performs fields query.
*
* @param qry Query.
* @param args Arguments.
* @param loc Local query or not.
* @param taskName Task name.
* @param rcpt ID of the recipient.
* @return Collection of found keys.
* @throws IgniteCheckedException In case of error.
*/
private FieldsResult executeFieldsQuery(GridCacheQueryAdapter<?> qry, @Nullable Object[] args,
boolean loc, @Nullable String taskName, Object rcpt) throws IgniteCheckedException {
assert qry != null;
FieldsResult res;
T2<String, List<Object>> resKey = null;
if (qry.clause() == null && qry.type() != SPI) {
assert !loc;
throw new IgniteCheckedException("Received next page request after iterator was removed. " +
"Consider increasing maximum number of stored iterators (see " +
"CacheConfiguration.getMaxQueryIteratorsCount() configuration property).");
}
if (qry.type() == SQL_FIELDS) {
if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
cctx.localNode(),
"SQL fields query executed.",
EVT_CACHE_QUERY_EXECUTED,
CacheQueryType.SQL_FIELDS.name(),
cctx.name(),
null,
qry.clause(),
null,
null,
args,
securitySubjectId(cctx),
taskName));
}
// Attempt to get result from cache.
resKey = new T2<>(qry.clause(), F.asList(args));
res = (FieldsResult)qryResCache.get(resKey);
if (res != null && res.addRecipient(rcpt))
return res; // Cached result found.
res = new FieldsResult(rcpt);
if (qryResCache.putIfAbsent(resKey, res) != null)
resKey = null; // Failed to cache result.
}
else {
assert qry.type() == SPI : "Unexpected query type: " + qry.type();
if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
cctx.localNode(),
"SPI query executed.",
EVT_CACHE_QUERY_EXECUTED,
CacheQueryType.SPI.name(),
cctx.name(),
null,
null,
null,
null,
args,
securitySubjectId(cctx),
taskName));
}
res = new FieldsResult(rcpt);
}
try {
if (qry.type() == SPI) {
IgniteSpiCloseableIterator<?> iter = cctx.kernalContext().indexing().query(cacheName, F.asList(args),
filter(qry));
res.onDone(iter);
}
else {
assert qry.type() == SQL_FIELDS;
throw new IllegalStateException("Should never be called.");
}
}
catch (Exception e) {
res.onDone(e);
}
finally {
if (resKey != null)
qryResCache.remove(resKey, res);
}
return res;
}
/**
* @param qry Query.
* @return Cache set items iterator.
*/
private GridCloseableIterator<IgniteBiTuple<K, V>> sharedCacheSetIterator(GridCacheQueryAdapter<?> qry)
throws IgniteCheckedException {
final GridSetQueryPredicate filter = (GridSetQueryPredicate)qry.scanFilter();
IgniteUuid id = filter.setId();
GridCacheQueryAdapter<CacheEntry<K, ?>> qry0 = new GridCacheQueryAdapter<>(cctx,
SCAN,
new IgniteBiPredicate<Object, Object>() {
@Override public boolean apply(Object k, Object v) {
return k instanceof SetItemKey && id.equals(((SetItemKey)k).setId());
}
},
new IgniteClosure<Map.Entry, Object>() {
@Override public Object apply(Map.Entry entry) {
return new IgniteBiTuple<K, V>((K)((SetItemKey)entry.getKey()).item(), (V)Boolean.TRUE);
}
},
qry.partition(),
false,
true,
qry.isDataPageScanEnabled());
return scanQueryLocal(qry0, false);
}
/**
* @param qry Query.
* @param transformer Transformer.
* @param locNode Local node.
* @return Full-scan row iterator.
* @throws IgniteCheckedException If failed to get iterator.
*/
@SuppressWarnings({"unchecked"})
private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> qry, IgniteClosure transformer,
boolean locNode)
throws IgniteCheckedException {
final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
final InternalScanFilter<K, V> intFilter = keyValFilter != null ? new InternalScanFilter<>(keyValFilter) : null;
try {
if (keyValFilter instanceof PlatformCacheEntryFilter)
((PlatformCacheEntryFilter)keyValFilter).cacheContext(cctx);
else
injectResources(keyValFilter);
Integer part = qry.partition();
if (part != null && (part < 0 || part >= cctx.affinity().partitions()))
return new GridEmptyCloseableIterator() {
@Override public void close() throws IgniteCheckedException {
if (intFilter != null)
intFilter.close();
super.close();
}
};
AffinityTopologyVersion topVer = GridQueryProcessor.getRequestAffinityTopologyVersion();
if (topVer == null)
topVer = cctx.affinity().affinityTopologyVersion();
final boolean backups = qry.includeBackups() || cctx.isReplicated();
final GridDhtLocalPartition locPart;
final GridIterator<CacheDataRow> it;
if (part != null) {
final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
GridDhtLocalPartition locPart0 = dht.topology().localPartition(part, topVer, false);
if (locPart0 == null || locPart0.state() != OWNING || !locPart0.reserve()) {
throw locPart0 != null && locPart0.state() == LOST ?
new CacheInvalidStateException("Failed to execute scan query because cache partition has been " +
"lost [cacheName=" + cctx.name() + ", part=" + part + "]") :
new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
"Partition can not be reserved");
}
locPart = locPart0;
it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), part, qry.mvccSnapshot(),
qry.isDataPageScanEnabled());
}
else {
locPart = null;
final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
Set<Integer> lostParts = dht.topology().lostPartitions();
if (!lostParts.isEmpty()) {
throw new CacheInvalidStateException("Failed to execute scan query because cache partition " +
"has been lost [cacheName=" + cctx.name() + ", part=" + lostParts.iterator().next() + "]");
}
it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer,
qry.mvccSnapshot(), qry.isDataPageScanEnabled());
}
ScanQueryIterator iter = new ScanQueryIterator(it, qry, topVer, locPart,
SecurityUtils.sandboxedProxy(cctx.kernalContext(), IgniteBiPredicate.class, keyValFilter),
SecurityUtils.sandboxedProxy(cctx.kernalContext(), IgniteClosure.class, transformer),
locNode, locNode ? locIters : null, cctx, log);
if (locNode) {
ScanQueryIterator old = locIters.addx(iter);
assert old == null;
}
return iter;
}
catch (IgniteCheckedException | RuntimeException e) {
if (intFilter != null)
intFilter.close();
throw e;
}
}
/**
* @param o Object to inject resources to.
* @throws IgniteCheckedException If failure occurred while injecting resources.
*/
private void injectResources(@Nullable Object o) throws IgniteCheckedException {
if (o != null) {
GridKernalContext ctx = cctx.kernalContext();
ClassLoader ldr = o.getClass().getClassLoader();
if (ctx.deploy().isGlobalLoader(ldr))
ctx.resource().inject(ctx.deploy().getDeployment(ctx.deploy().getClassLoaderId(ldr)), o.getClass(), o);
else
ctx.resource().inject(ctx.deploy().getDeployment(o.getClass().getName()), o.getClass(), o);
}
}
/**
* Processes fields query request.
*
* @param qryInfo Query info.
*/
protected void runFieldsQuery(GridCacheQueryInfo qryInfo) {
assert qryInfo != null;
if (!enterBusy()) {
if (cctx.localNodeId().equals(qryInfo.senderId()))
throw new IllegalStateException("Failed to process query request (grid is stopping).");
return; // Ignore remote requests when when node is stopping.
}
try {
if (log.isDebugEnabled())
log.debug("Running query: " + qryInfo);
boolean rmvRes = true;
FieldsResult res = null;
final boolean statsEnabled = cctx.statisticsEnabled();
final boolean readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
try {
// Preparing query closures.
IgniteReducer<Object, Object> rdc = (IgniteReducer<Object, Object>)qryInfo.reducer();
injectResources(rdc);
GridCacheQueryAdapter<?> qry = qryInfo.query();
int pageSize = qry.pageSize();
Collection<Object> data = null;
Collection<Object> entities = null;
if (qryInfo.local() || rdc != null || cctx.isLocalNode(qryInfo.senderId()))
data = new ArrayList<>(pageSize);
else
entities = new ArrayList<>(pageSize);
String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash());
res = qryInfo.local() ?
executeFieldsQuery(qry, qryInfo.arguments(), qryInfo.local(), taskName,
recipient(qryInfo.senderId(), qryInfo.requestId())) :
fieldsQueryResult(qryInfo, taskName);
// If metadata needs to be returned to user and cleaned from internal fields - copy it.
List<GridQueryFieldMetadata> meta = qryInfo.includeMetaData() ?
(res.metaData() != null ? new ArrayList<>(res.metaData()) : null) :
res.metaData();
if (!qryInfo.includeMetaData())
meta = null;
GridCloseableIterator<?> it = new GridSpiCloseableIteratorWrapper<Object>(
res.iterator(recipient(qryInfo.senderId(), qryInfo.requestId())));
if (log.isDebugEnabled())
log.debug("Received fields iterator [iterHasNext=" + it.hasNext() + ']');
if (!it.hasNext()) {
if (rdc != null)
data = Collections.singletonList(rdc.reduce());
onFieldsPageReady(qryInfo.local(), qryInfo, meta, entities, data, true, null);
return;
}
int cnt = 0;
boolean metaSent = false;
while (!Thread.currentThread().isInterrupted() && it.hasNext()) {
long start = statsEnabled ? System.nanoTime() : 0L;
Object row = it.next();
// Query is cancelled.
if (row == null) {
onPageReady(qryInfo.local(), qryInfo, null, null, true, null);
break;
}
if (statsEnabled) {
CacheMetricsImpl metrics = cctx.cache().metrics0();
metrics.onRead(true);
metrics.addGetTimeNanos(System.nanoTime() - start);
}
if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
cctx.gridEvents().record(new CacheQueryReadEvent<K, V>(
cctx.localNode(),
"SQL fields query result set row read.",
EVT_CACHE_QUERY_OBJECT_READ,
CacheQueryType.SQL_FIELDS.name(),
cctx.name(),
null,
qry.clause(),
null,
null,
qryInfo.arguments(),
securitySubjectId(cctx),
taskName,
null,
null,
null,
row));
}
if ((qryInfo.local() || rdc != null || cctx.isLocalNode(qryInfo.senderId()))) {
// Reduce.
if (rdc != null) {
if (!rdc.collect(row))
break;
}
else
data.add(row);
}
else
entities.add(row);
if (rdc == null && ((!qryInfo.allPages() && ++cnt == pageSize) || !it.hasNext())) {
onFieldsPageReady(qryInfo.local(), qryInfo, !metaSent ? meta : null,
entities, data, !it.hasNext(), null);
if (it.hasNext())
rmvRes = false;
if (!qryInfo.allPages())
return;
}
}
if (rdc != null) {
onFieldsPageReady(qryInfo.local(), qryInfo, meta, null,
Collections.singletonList(rdc.reduce()), true, null);
}
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled() || !e.hasCause(SQLException.class))
U.error(log, "Failed to run fields query [qry=" + qryInfo + ", node=" + cctx.nodeId() + ']', e);
else {
if (e.hasCause(SQLException.class))
U.error(log, "Failed to run fields query [node=" + cctx.nodeId() +
", msg=" + e.getCause(SQLException.class).getMessage() + ']');
else
U.error(log, "Failed to run fields query [node=" + cctx.nodeId() +
", msg=" + e.getMessage() + ']');
}
onFieldsPageReady(qryInfo.local(), qryInfo, null, null, null, true, e);
}
catch (Throwable e) {
U.error(log, "Failed to run fields query [qry=" + qryInfo + ", node=" + cctx.nodeId() + "]", e);
onFieldsPageReady(qryInfo.local(), qryInfo, null, null, null, true, e);
if (e instanceof Error)
throw (Error)e;
}
finally {
if (qryInfo.local()) {
// Don't we need to always remove local iterators?
if (rmvRes && res != null) {
try {
res.closeIfNotShared(recipient(qryInfo.senderId(), qryInfo.requestId()));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" +
cctx.nodeId() + "]", e);
}
}
}
else if (rmvRes)
removeFieldsQueryResult(qryInfo.senderId(), qryInfo.requestId());
}
}
finally {
leaveBusy();
}
}
/**
* Processes cache query request.
*
* @param qryInfo Query info.
*/
@SuppressWarnings("unchecked")
protected void runQuery(GridCacheQueryInfo qryInfo) {
assert qryInfo != null;
assert qryInfo.query().type() != SCAN || !qryInfo.local() : qryInfo;
if (!enterBusy()) {
if (cctx.localNodeId().equals(qryInfo.senderId()))
throw new IllegalStateException("Failed to process query request (grid is stopping).");
return; // Ignore remote requests when when node is stopping.
}
try {
boolean performanceStatsEnabled = cctx.kernalContext().performanceStatistics().enabled();
if (performanceStatsEnabled)
IoStatisticsQueryHelper.startGatheringQueryStatistics();
boolean loc = qryInfo.local();
QueryResult<K, V> res = null;
if (log.isDebugEnabled())
log.debug("Running query: " + qryInfo);
boolean rmvIter = true;
GridCacheQueryAdapter<?> qry = qryInfo.query();
try {
// Preparing query closures.
IgniteClosure<Cache.Entry<K, V>, Object> trans =
(IgniteClosure<Cache.Entry<K, V>, Object>)qryInfo.transformer();
IgniteReducer<Cache.Entry<K, V>, Object> rdc = (IgniteReducer<Cache.Entry<K, V>, Object>)qryInfo.reducer();
injectResources(trans);
injectResources(rdc);
int pageSize = qry.pageSize();
boolean incBackups = qry.includeBackups();
String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash());
IgniteSpiCloseableIterator iter;
GridCacheQueryType type;
res = loc ?
executeQuery(qry, qryInfo.arguments(), trans, loc, taskName,
recipient(qryInfo.senderId(), qryInfo.requestId())) :
queryResult(qryInfo, taskName);
if (res == null)
return;
iter = res.iterator(recipient(qryInfo.senderId(), qryInfo.requestId()));
type = res.type();
final GridCacheAdapter<K, V> cache = cctx.cache();
if (log.isDebugEnabled())
log.debug("Received index iterator [iterHasNext=" + iter.hasNext() +
", cacheSize=" + cache.size() + ']');
int cnt = 0;
boolean stop = false;
boolean pageSent = false;
Collection<Object> data = new ArrayList<>(pageSize);
AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
final boolean statsEnabled = cctx.statisticsEnabled();
final boolean readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
CacheObjectContext objCtx = cctx.cacheObjectContext();
while (!Thread.currentThread().isInterrupted()) {
long start = statsEnabled ? System.nanoTime() : 0L;
// Need to call it after gathering start time because
// actual row extracting may happen inside this method.
if (!iter.hasNext())
break;
Object row0 = iter.next();
// Query is cancelled.
if (row0 == null) {
onPageReady(loc, qryInfo, null, null, true, null);
break;
}
if (type == SCAN || type == INDEX)
// Scan iterator may return already transformed entry
data.add(row0);
else {
IgniteBiTuple<K, V> row = (IgniteBiTuple<K, V>)row0;
final K key = row.getKey();
final V val = row.getValue();
if (log.isDebugEnabled()) {
ClusterNode primaryNode = cctx.affinity().primaryByKey(key,
cctx.affinity().affinityTopologyVersion());
log.debug(S.toString("Record",
"key", key, true,
"val", val, true,
"incBackups", incBackups, false,
"priNode", primaryNode != null ? U.id8(primaryNode.id()) : null, false,
"node", U.id8(cctx.localNode().id()), false));
}
if (val == null) {
if (log.isDebugEnabled())
log.debug(S.toString("Unsuitable record value", "val", val, true));
continue;
}
if (statsEnabled) {
CacheMetricsImpl metrics = cctx.cache().metrics0();
metrics.onRead(true);
metrics.addGetTimeNanos(System.nanoTime() - start);
}
K key0 = null;
V val0 = null;
if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, qry.keepBinary(), false, null);
val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, qry.keepBinary(), false, null);
switch (type) {
case SQL:
cctx.gridEvents().record(new CacheQueryReadEvent<>(
cctx.localNode(),
"SQL query entry read.",
EVT_CACHE_QUERY_OBJECT_READ,
CacheQueryType.SQL.name(),
cctx.name(),
qry.queryClassName(),
qry.clause(),
null,
null,
qryInfo.arguments(),
securitySubjectId(cctx),
taskName,
key0,
val0,
null,
null));
break;
case TEXT:
cctx.gridEvents().record(new CacheQueryReadEvent<>(
cctx.localNode(),
"Full text query entry read.",
EVT_CACHE_QUERY_OBJECT_READ,
CacheQueryType.FULL_TEXT.name(),
cctx.name(),
qry.queryClassName(),
qry.clause(),
null,
null,
null,
securitySubjectId(cctx),
taskName,
key0,
val0,
null,
null));
break;
}
}
if (rdc != null) {
if (key0 == null)
key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, qry.keepBinary(), false, null);
if (val0 == null)
val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, qry.keepBinary(), false, null);
Cache.Entry<K, V> entry = new CacheEntryImpl(key0, val0);
// Reduce.
if (!rdc.collect(entry) || !iter.hasNext()) {
onPageReady(loc, qryInfo, null, Collections.singletonList(rdc.reduce()), true, null);
pageSent = true;
break;
}
else
continue;
}
else {
if (type == TEXT)
// (K, V, score). Value transfers as BinaryObject.
data.add(row0);
else
data.add(new T2<>(key, val));
}
}
if (!loc) {
if (++cnt == pageSize || !iter.hasNext()) {
boolean finished = !iter.hasNext();
onPageReady(loc, qryInfo, res.metadata(), data, finished, null);
pageSent = true;
res.onPageSend();
if (!finished)
rmvIter = false;
if (!qryInfo.allPages())
return;
data = new ArrayList<>(pageSize);
if (stop)
break; // while
}
}
}
if (!pageSent) {
if (rdc == null)
onPageReady(loc, qryInfo, res.metadata(), data, true, null);
else
onPageReady(loc, qryInfo, res.metadata(), Collections.singletonList(rdc.reduce()), true, null);
res.onPageSend();
}
}
catch (Throwable e) {
if (X.hasCause(e, ClassNotFoundException.class) && !qry.keepBinary() && cctx.binaryMarshaller() &&
!cctx.localNode().isClient() && !log.isQuiet()) {
LT.warn(log, "Suggestion for the cause of ClassNotFoundException");
LT.warn(log, "To disable, set -D" + IGNITE_QUIET + "=true");
LT.warn(log, " ^-- Ignite configured to use BinaryMarshaller but keepBinary is false for " +
"request");
LT.warn(log, " ^-- Server node need to load definition of data classes. " +
"It can be reason of ClassNotFoundException(consider IgniteCache.withKeepBinary to fix)");
LT.warn(log, "Refer this page for detailed information: " +
"https://apacheignite.readme.io/docs/binary-marshaller");
}
if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
U.error(log, "Failed to run query [qry=" + qryInfo + ", node=" + cctx.nodeId() + "]", e);
onPageReady(loc, qryInfo, null, null, true, e);
if (e instanceof Error)
throw (Error)e;
}
finally {
if (loc) {
// Local iterators are always removed.
if (res != null) {
try {
res.closeIfNotShared(recipient(qryInfo.senderId(), qryInfo.requestId()));
}
catch (IgniteCheckedException e) {
if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" +
cctx.nodeId() + "]", e);
}
}
}
else if (rmvIter)
removeQueryResult(qryInfo.senderId(), qryInfo.requestId());
if (performanceStatsEnabled) {
IoStatisticsHolder stat = IoStatisticsQueryHelper.finishGatheringQueryStatistics();
if (stat.logicalReads() > 0 || stat.physicalReads() > 0) {
cctx.kernalContext().performanceStatistics().queryReads(
res.type(),
qryInfo.senderId(),
qryInfo.requestId(),
stat.logicalReads(),
stat.physicalReads());
}
}
}
}
finally {
leaveBusy();
}
}
/**
* Process local scan query.
*
* @param qry Query.
* @param updateStatistics Update statistics flag.
*/
@SuppressWarnings({"unchecked"})
protected GridCloseableIterator scanQueryLocal(final GridCacheQueryAdapter qry,
boolean updateStatistics) throws IgniteCheckedException {
if (!enterBusy())
throw new IllegalStateException("Failed to process query request (grid is stopping).");
final boolean statsEnabled = cctx.statisticsEnabled();
updateStatistics &= statsEnabled;
long startTime = U.currentTimeMillis();
final String namex = cctx.name();
final InternalScanFilter<K, V> intFilter = qry.scanFilter() != null ?
new InternalScanFilter<>(qry.scanFilter()) : null;
try {
assert qry.type() == SCAN;
if (log.isDebugEnabled())
log.debug("Running local SCAN query: " + qry);
final String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash());
final ClusterNode locNode = cctx.localNode();
if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
locNode,
"Scan query executed.",
EVT_CACHE_QUERY_EXECUTED,
CacheQueryType.SCAN.name(),
namex,
null,
null,
intFilter != null ? intFilter.scanFilter() : null,
null,
null,
securitySubjectId(cctx),
taskName));
}
IgniteClosure transformer = qry.transform();
injectResources(transformer);
GridCloseableIterator it = scanIterator(qry, transformer, true);
updateStatistics = false;
return it;
}
catch (Exception e) {
if (intFilter != null)
intFilter.close();
if (updateStatistics)
cctx.queries().collectMetrics(GridCacheQueryType.SCAN, namex, startTime,
U.currentTimeMillis() - startTime, true);
throw e;
}
finally {
leaveBusy();
}
}
/**
* @param qryInfo Info.
* @param taskName Task name.
* @return Iterator.
* @throws IgniteCheckedException In case of error.
*/
@Nullable private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo,
String taskName) throws IgniteCheckedException {
assert qryInfo != null;
final UUID sndId = qryInfo.senderId();
assert sndId != null;
RequestFutureMap futs = qryIters.get(sndId);
if (futs == null) {
futs = new RequestFutureMap() {
@Override protected boolean removeEldestEntry(Map.Entry<Long, GridFutureAdapter<QueryResult<K, V>>> e) {
boolean rmv = size() > maxIterCnt;
if (rmv) {
try {
e.getValue().get().closeIfNotShared(recipient(sndId, e.getKey()));
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to close query iterator.", ex);
}
}
return rmv;
}
};
RequestFutureMap old = qryIters.putIfAbsent(sndId, futs);
if (old != null)
futs = old;
}
assert futs != null;
GridFutureAdapter<QueryResult<K, V>> fut;
boolean exec = false;
synchronized (futs) {
if (futs.isCanceled(qryInfo.requestId()))
return null;
fut = futs.get(qryInfo.requestId());
if (fut == null) {
futs.put(qryInfo.requestId(), fut = new GridFutureAdapter<>());
exec = true;
}
}
if (exec) {
try {
fut.onDone(executeQuery(qryInfo.query(), qryInfo.arguments(), qryInfo.transformer(), false,
taskName, recipient(qryInfo.senderId(), qryInfo.requestId())));
}
catch (Throwable e) {
fut.onDone(e);
if (e instanceof Error)
throw (Error)e;
}
}
return fut.get();
}
/**
* @param sndId Sender node ID.
* @param reqId Request ID.
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public void removeQueryResult(@Nullable UUID sndId, long reqId) {
if (sndId == null)
return;
RequestFutureMap futs = qryIters.get(sndId);
if (futs != null) {
IgniteInternalFuture<QueryResult<K, V>> fut;
synchronized (futs) {
fut = futs.remove(reqId);
}
if (fut != null) {
try {
fut.get().closeIfNotShared(recipient(sndId, reqId));
}
catch (IgniteCheckedException e) {
if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
U.error(log, "Failed to close iterator.", e);
}
}
}
}
/**
* @param sndId Sender node ID.
* @param reqId Request ID.
* @return Recipient ID.
*/
private static Object recipient(UUID sndId, long reqId) {
assert sndId != null;
return new IgniteBiTuple<>(sndId, reqId);
}
/**
* @param qryInfo Info.
* @return Iterator.
* @throws IgniteCheckedException In case of error.
*/
private FieldsResult fieldsQueryResult(GridCacheQueryInfo qryInfo, String taskName)
throws IgniteCheckedException {
final UUID sndId = qryInfo.senderId();
assert sndId != null;
Map<Long, GridFutureAdapter<FieldsResult>> iters = fieldsQryRes.get(sndId);
if (iters == null) {
iters = new LinkedHashMap<Long, GridFutureAdapter<FieldsResult>>(16, 0.75f, true) {
@Override protected boolean removeEldestEntry(Map.Entry<Long,
GridFutureAdapter<FieldsResult>> e) {
boolean rmv = size() > maxIterCnt;
if (rmv) {
try {
e.getValue().get().closeIfNotShared(recipient(sndId, e.getKey()));
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to close fields query iterator.", ex);
}
}
return rmv;
}
@Override public boolean equals(Object o) {
return o == this;
}
};
Map<Long, GridFutureAdapter<FieldsResult>> old = fieldsQryRes.putIfAbsent(sndId, iters);
if (old != null)
iters = old;
}
return fieldsQueryResult(iters, qryInfo, taskName);
}
/**
* @param resMap Results map.
* @param qryInfo Info.
* @return Fields query result.
* @throws IgniteCheckedException In case of error.
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
private FieldsResult fieldsQueryResult(Map<Long, GridFutureAdapter<FieldsResult>> resMap,
GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException {
assert resMap != null;
assert qryInfo != null;
GridFutureAdapter<FieldsResult> fut;
boolean exec = false;
synchronized (resMap) {
fut = resMap.get(qryInfo.requestId());
if (fut == null) {
resMap.put(qryInfo.requestId(), fut = new GridFutureAdapter<>());
exec = true;
}
}
if (exec) {
try {
fut.onDone(executeFieldsQuery(qryInfo.query(), qryInfo.arguments(), false,
taskName, recipient(qryInfo.senderId(), qryInfo.requestId())));
}
catch (IgniteCheckedException e) {
fut.onDone(e);
}
}
return fut.get();
}
/**
* @param sndId Sender node ID.
* @param reqId Request ID.
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
protected void removeFieldsQueryResult(@Nullable UUID sndId, long reqId) {
if (sndId == null)
return;
Map<Long, GridFutureAdapter<FieldsResult>> futs = fieldsQryRes.get(sndId);
if (futs != null) {
IgniteInternalFuture<FieldsResult> fut;
synchronized (futs) {
fut = futs.remove(reqId);
}
if (fut != null) {
assert fut.isDone();
try {
fut.get().closeIfNotShared(recipient(sndId, reqId));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to close iterator.", e);
}
}
}
}
/**
* Called when data for page is ready.
*
* @param loc Local query or not.
* @param qryInfo Query info.
* @param metaData Meta data.
* @param data Result data.
* @param finished Last page or not.
* @param e Exception in case of error.
* @return {@code true} if page was processed right.
*/
protected abstract boolean onPageReady(boolean loc, GridCacheQueryInfo qryInfo, @Nullable IndexQueryResultMeta metaData,
@Nullable Collection<?> data, boolean finished, @Nullable Throwable e);
/**
* @param loc Local query or not.
* @param qryInfo Query info.
* @param metaData Meta data.
* @param entities Indexing entities.
* @param data Data.
* @param finished Last page or not.
* @param e Exception in case of error.
* @return {@code true} if page was processed right.
*/
protected abstract boolean onFieldsPageReady(boolean loc, GridCacheQueryInfo qryInfo,
@Nullable List<GridQueryFieldMetadata> metaData,
@Nullable Collection<?> entities,
@Nullable Collection<?> data,
boolean finished, @Nullable Throwable e);
/**
* Gets cache queries metrics.
*
* @return Cache queries metrics.
*/
public QueryMetrics metrics() {
return metrics.snapshot();
}
/**
* Gets cache queries detailed metrics. Detail metrics could be enabled by setting non-zero value via {@link
* CacheConfiguration#setQueryDetailMetricsSize(int)}
*
* @return Cache queries metrics aggregated by query type and query text.
*/
public Collection<GridCacheQueryDetailMetricsAdapter> detailMetrics() {
if (detailMetricsSz > 0) {
// Return no more than latest detailMetricsSz items.
if (detailMetrics.size() > detailMetricsSz) {
GridBoundedPriorityQueue<GridCacheQueryDetailMetricsAdapter> latestMetrics =
new GridBoundedPriorityQueue<>(detailMetricsSz, QRY_DETAIL_METRICS_PRIORITY_NEW_CMP);
latestMetrics.addAll(detailMetrics.values());
return latestMetrics;
}
return new ArrayList<>(detailMetrics.values());
}
return Collections.emptyList();
}
/**
* Evict detail metrics.
*/
public void evictDetailMetrics() {
if (detailMetricsSz > 0) {
int sz = detailMetrics.size();
if (sz > detailMetricsSz) {
// Limit number of metrics to evict in order make eviction time predictable.
int evictCnt = Math.min(QRY_DETAIL_METRICS_EVICTION_LIMIT, sz - detailMetricsSz);
Queue<GridCacheQueryDetailMetricsAdapter> metricsToEvict =
new GridBoundedPriorityQueue<>(evictCnt, QRY_DETAIL_METRICS_PRIORITY_OLD_CMP);
metricsToEvict.addAll(detailMetrics.values());
for (GridCacheQueryDetailMetricsAdapter m : metricsToEvict)
detailMetrics.remove(m.key());
}
}
}
/**
* Resets metrics.
*/
public void resetMetrics() {
metrics.reset();
}
/**
* Resets detail metrics.
*/
public void resetDetailMetrics() {
if (detailMetrics != null)
detailMetrics.clear();
}
/**
* @param qryType Query type.
* @param qry Query description.
* @param startTime Query start size.
* @param duration Execution duration.
* @param failed {@code True} if query execution failed.
*/
public void collectMetrics(GridCacheQueryType qryType, String qry, long startTime, long duration, boolean failed) {
metrics.update(duration, failed);
if (detailMetricsSz > 0) {
// Do not collect metrics for EXPLAIN queries.
if (qryType == SQL_FIELDS && !F.isEmpty(qry)) {
int off = 0;
int len = qry.length();
while (off < len && Character.isWhitespace(qry.charAt(off)))
off++;
if (qry.regionMatches(true, off, "EXPLAIN", 0, 7))
return;
}
GridCacheQueryDetailMetricsAdapter m = new GridCacheQueryDetailMetricsAdapter(qryType, qry,
cctx.name(), startTime, duration, failed);
GridCacheQueryDetailMetricsKey key = m.key();
detailMetrics.merge(key, m, QRY_DETAIL_METRICS_MERGE_FX);
}
}
/**
* Gets SQL metadata asynchronously.
*
* @return SQL metadata future.
* @throws IgniteCheckedException In case of error.
*/
public IgniteInternalFuture<Collection<GridCacheSqlMetadata>> sqlMetadataAsync() throws IgniteCheckedException {
if (!enterBusy())
throw new IllegalStateException("Failed to get metadata (grid is stopping).");
try {
Callable<Collection<CacheSqlMetadata>> job = new MetadataJob();
// Remote nodes that have current cache.
Collection<ClusterNode> nodes = CU.affinityNodes(cctx, AffinityTopologyVersion.NONE);
Collection<Collection<CacheSqlMetadata>> res = new ArrayList<>(nodes.size() + 1);
IgniteInternalFuture<Collection<Collection<CacheSqlMetadata>>> rmtFut = null;
// Get metadata from remote nodes.
if (!nodes.isEmpty())
rmtFut = cctx.closures().callAsync(
BROADCAST,
Collections.singleton(job),
options(nodes)
.withFailoverDisabled()
.asSystemTask()
);
// Get local metadata.
IgniteInternalFuture<Collection<CacheSqlMetadata>> locFut = cctx.closures().callLocalSafe(job, true);
res.add(locFut.get());
if (rmtFut == null)
return new GridFinishedFuture<>(convertMetadata(res));
return rmtFut.chain(
new IgniteClosureX<IgniteInternalFuture<Collection<Collection<CacheSqlMetadata>>>, Collection<GridCacheSqlMetadata>>() {
@Override public Collection<GridCacheSqlMetadata> applyx(
IgniteInternalFuture<Collection<Collection<CacheSqlMetadata>>> fut) throws IgniteCheckedException {
res.addAll(fut.get());
return convertMetadata(res);
}
}
);
}
finally {
leaveBusy();
}
}
/**
* Transforms collections of {@link CacheSqlMetadata} collected from nodes into collection of {@link
* GridCacheSqlMetadata}.
*
* @param res collections of metadata from nodes.
* @return collection of aggregated metadata.
*/
@NotNull private Collection<GridCacheSqlMetadata> convertMetadata(
Collection<Collection<CacheSqlMetadata>> res) {
Map<String, Collection<CacheSqlMetadata>> map = new HashMap<>();
for (Collection<CacheSqlMetadata> col : res) {
for (CacheSqlMetadata meta : col) {
String name = meta.cacheName();
Collection<CacheSqlMetadata> cacheMetas = map.computeIfAbsent(name, k -> new LinkedList<>());
cacheMetas.add(meta);
}
}
Collection<GridCacheSqlMetadata> col = new ArrayList<>(map.size());
// Metadata for current cache must be first in list.
col.add(new CacheSqlMetadata(map.remove(cacheName)));
for (Collection<CacheSqlMetadata> metas : map.values())
col.add(new CacheSqlMetadata(metas));
return col;
}
/**
* Gets SQL metadata.
*
* @return SQL metadata.
* @throws IgniteCheckedException In case of error.
*/
public Collection<GridCacheSqlMetadata> sqlMetadata() throws IgniteCheckedException {
return sqlMetadataAsync().get();
}
/**
* Gets SQL metadata with not nulls fields.
*
* @return SQL metadata.
* @throws IgniteCheckedException In case of error.
*/
public Collection<GridCacheSqlMetadata> sqlMetadataV2() throws IgniteCheckedException {
if (!enterBusy())
throw new IllegalStateException("Failed to get metadata (grid is stopping).");
try {
Callable<Collection<CacheSqlMetadata>> job = new GridCacheQuerySqlMetadataJobV2();
// Remote nodes that have current cache.
Collection<ClusterNode> nodes = CU.affinityNodes(cctx, AffinityTopologyVersion.NONE);
Collection<Collection<CacheSqlMetadata>> res = new ArrayList<>(nodes.size() + 1);
IgniteInternalFuture<Collection<Collection<CacheSqlMetadata>>> rmtFut = null;
// Get metadata from remote nodes.
if (!nodes.isEmpty()) {
boolean allNodesNew = true;
for (ClusterNode n : nodes) {
if (n.version().compareTo(NOT_NULLS_SUPPORT_VER) < 0)
allNodesNew = false;
}
if (!allNodesNew)
return sqlMetadata();
rmtFut = cctx.closures().callAsync(
BROADCAST,
Collections.singleton(job),
options(nodes)
.withFailoverDisabled()
.asSystemTask()
);
}
// Get local metadata.
IgniteInternalFuture<Collection<CacheSqlMetadata>> locFut = cctx.closures().callLocalSafe(job, true);
if (rmtFut != null)
res.addAll(rmtFut.get());
res.add(locFut.get());
Map<String, Collection<CacheSqlMetadata>> map = new HashMap<>();
for (Collection<CacheSqlMetadata> col : res) {
for (CacheSqlMetadata meta : col) {
String name = meta.cacheName();
Collection<CacheSqlMetadata> cacheMetas = map.get(name);
if (cacheMetas == null)
map.put(name, cacheMetas = new LinkedList<>());
cacheMetas.add(meta);
}
}
Collection<GridCacheSqlMetadata> col = new ArrayList<>(map.size());
// Metadata for current cache must be first in list.
col.add(new GridCacheQuerySqlMetadataV2(map.remove(cacheName)));
for (Collection<CacheSqlMetadata> metas : map.values())
col.add(new GridCacheQuerySqlMetadataV2(metas));
return col;
}
finally {
leaveBusy();
}
}
/**
* @return Topology version for query requests.
*/
public AffinityTopologyVersion queryTopologyVersion() {
return qryTopVer;
}
/**
* @param qry Query.
* @return Filter.
*/
private IndexingQueryFilter filter(GridCacheQueryAdapter<?> qry) {
return filter(qry, null, false);
}
/**
* @param qry Query.
* @param partsArr Array of partitions to apply specified query.
* @param treatReplicatedAsPartitioned If true, only primary partitions of replicated caches will be used.
* @return Filter.
*/
private IndexingQueryFilter filter(GridCacheQueryAdapter<?> qry, @Nullable int[] partsArr, boolean treatReplicatedAsPartitioned) {
if (qry.includeBackups())
return null;
return new IndexingQueryFilterImpl(cctx.kernalContext(), AffinityTopologyVersion.NONE, partsArr, treatReplicatedAsPartitioned);
}
/**
* Prints memory statistics for debugging purposes.
*/
@Override public void printMemoryStats() {
X.println(">>>");
X.println(">>> Query manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ", cache=" + cctx.name() + ']');
}
/**
* FOR TESTING ONLY
*
* @return Cache name for this query manager.
*/
public String cacheName() {
return cacheName;
}
/**
* Metadata job.
*/
@GridInternal
private static class MetadataJob implements IgniteCallable<Collection<CacheSqlMetadata>> {
/** */
private static final long serialVersionUID = 0L;
/**
* Number of fields to report when no fields defined. Includes _key and _val columns.
*/
private static final int NO_FIELDS_COLUMNS_COUNT = 2;
/** Grid */
@IgniteInstanceResource
private Ignite ignite;
/** {@inheritDoc} */
@Override public Collection<CacheSqlMetadata> call() {
final GridKernalContext ctx = ((IgniteKernal)ignite).context();
Collection<String> cacheNames = F.viewReadOnly(ctx.cache().caches(),
new C1<IgniteInternalCache<?, ?>, String>() {
@Override public String apply(IgniteInternalCache<?, ?> c) {
return c.name();
}
},
new P1<IgniteInternalCache<?, ?>>() {
@Override public boolean apply(IgniteInternalCache<?, ?> c) {
return !CU.isSystemCache(c.name()) && !DataStructuresProcessor.isDataStructureCache(c.name());
}
}
);
return F.transform(cacheNames, new C1<String, CacheSqlMetadata>() {
@Override public CacheSqlMetadata apply(String cacheName) {
Collection<GridQueryTypeDescriptor> types = ctx.query().types(cacheName);
Collection<String> names = U.newHashSet(types.size());
Map<String, String> keyClasses = U.newHashMap(types.size());
Map<String, String> valClasses = U.newHashMap(types.size());
Map<String, Map<String, String>> fields = U.newHashMap(types.size());
Map<String, Collection<GridCacheSqlIndexMetadata>> indexes = U.newHashMap(types.size());
for (GridQueryTypeDescriptor type : types) {
// Filter internal types (e.g., data structures).
if (type.name().startsWith("GridCache"))
continue;
names.add(type.name());
keyClasses.put(type.name(), type.keyClass().getName());
valClasses.put(type.name(), type.valueClass().getName());
int size = type.fields().isEmpty() ? NO_FIELDS_COLUMNS_COUNT : type.fields().size();
Map<String, String> fieldsMap = U.newLinkedHashMap(size);
// _KEY and _VAL are not included in GridIndexingTypeDescriptor.valueFields
if (type.fields().isEmpty()) {
fieldsMap.put("_KEY", type.keyClass().getName());
fieldsMap.put("_VAL", type.valueClass().getName());
}
for (Map.Entry<String, Class<?>> e : type.fields().entrySet())
fieldsMap.put(e.getKey().toUpperCase(), e.getValue().getName());
fields.put(type.name(), fieldsMap);
Map<String, GridQueryIndexDescriptor> idxs = type.indexes();
Collection<GridCacheSqlIndexMetadata> indexesCol = new ArrayList<>(idxs.size());
for (Map.Entry<String, GridQueryIndexDescriptor> e : idxs.entrySet()) {
GridQueryIndexDescriptor desc = e.getValue();
// Add only SQL indexes.
if (desc.type() == QueryIndexType.SORTED) {
Collection<String> idxFields = new LinkedList<>();
Collection<String> descendings = new LinkedList<>();
for (String idxField : e.getValue().fields()) {
String idxFieldUpper = idxField.toUpperCase();
idxFields.add(idxFieldUpper);
if (desc.descending(idxField))
descendings.add(idxFieldUpper);
}
indexesCol.add(new CacheSqlIndexMetadata(e.getKey().toUpperCase(),
idxFields, descendings, false));
}
}
indexes.put(type.name(), indexesCol);
}
return new CacheSqlMetadata(cacheName, names, keyClasses, valClasses, fields, indexes);
}
});
}
}
/**
* Cache metadata.
*/
public static class CacheSqlMetadata implements GridCacheSqlMetadata {
/** */
private static final long serialVersionUID = 0L;
/** */
private String cacheName;
/** */
private Collection<String> types;
/** */
private Map<String, String> keyClasses;
/** */
private Map<String, String> valClasses;
/** */
private Map<String, Map<String, String>> fields;
/** */
private Map<String, Collection<GridCacheSqlIndexMetadata>> indexes;
/**
* Required by {@link Externalizable}.
*/
public CacheSqlMetadata() {
// No-op.
}
/**
* @param cacheName Cache name.
* @param types Types.
* @param keyClasses Key classes map.
* @param valClasses Value classes map.
* @param fields Fields maps.
* @param indexes Indexes.
*/
CacheSqlMetadata(@Nullable String cacheName, Collection<String> types, Map<String, String> keyClasses,
Map<String, String> valClasses, Map<String, Map<String, String>> fields,
Map<String, Collection<GridCacheSqlIndexMetadata>> indexes) {
assert types != null;
assert keyClasses != null;
assert valClasses != null;
assert fields != null;
assert indexes != null;
this.cacheName = cacheName;
this.types = types;
this.keyClasses = keyClasses;
this.valClasses = valClasses;
this.fields = fields;
this.indexes = indexes;
}
/**
* @param metas Meta data instances from different nodes.
*/
CacheSqlMetadata(Iterable<CacheSqlMetadata> metas) {
types = new HashSet<>();
keyClasses = new HashMap<>();
valClasses = new HashMap<>();
fields = new HashMap<>();
indexes = new HashMap<>();
for (CacheSqlMetadata meta : metas) {
if (cacheName == null)
cacheName = meta.cacheName;
else
assert F.eq(cacheName, meta.cacheName);
types.addAll(meta.types);
keyClasses.putAll(meta.keyClasses);
valClasses.putAll(meta.valClasses);
fields.putAll(meta.fields);
indexes.putAll(meta.indexes);
}
}
/** {@inheritDoc} */
@Override public String cacheName() {
return cacheName;
}
/** {@inheritDoc} */
@Override public Collection<String> types() {
return types;
}
/** {@inheritDoc} */
@Override public String keyClass(String type) {
return keyClasses.get(type);
}
/** {@inheritDoc} */
@Override public String valueClass(String type) {
return valClasses.get(type);
}
/** {@inheritDoc} */
@Override public Map<String, String> fields(String type) {
return fields.get(type);
}
/** {@inheritDoc} */
@Override public Collection<String> notNullFields(String type) {
return null;
}
/** {@inheritDoc} */
@Override public Map<String, String> keyClasses() {
return keyClasses;
}
/** {@inheritDoc} */
@Override public Map<String, String> valClasses() {
return valClasses;
}
/** {@inheritDoc} */
@Override public Map<String, Map<String, String>> fields() {
return fields;
}
/** {@inheritDoc} */
@Override public Map<String, Collection<GridCacheSqlIndexMetadata>> indexes() {
return indexes;
}
/** {@inheritDoc} */
@Override public Collection<GridCacheSqlIndexMetadata> indexes(String type) {
return indexes.get(type);
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, cacheName);
U.writeCollection(out, types);
U.writeMap(out, keyClasses);
U.writeMap(out, valClasses);
U.writeMap(out, fields);
U.writeMap(out, indexes);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
cacheName = U.readString(in);
types = U.readCollection(in);
keyClasses = U.readMap(in);
valClasses = U.readMap(in);
fields = U.readMap(in);
indexes = U.readMap(in);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheSqlMetadata.class, this);
}
}
/**
* Cache metadata index.
*/
public static class CacheSqlIndexMetadata implements GridCacheSqlIndexMetadata {
/** */
private static final long serialVersionUID = 0L;
/** */
private String name;
/** */
private Collection<String> fields;
/** */
private Collection<String> descendings;
/** */
private boolean unique;
/**
* Required by {@link Externalizable}.
*/
public CacheSqlIndexMetadata() {
// No-op.
}
/**
* @param name Index name.
* @param fields Fields.
* @param descendings Descendings.
* @param unique Unique flag.
*/
CacheSqlIndexMetadata(String name, Collection<String> fields, Collection<String> descendings,
boolean unique) {
assert name != null;
assert fields != null;
assert descendings != null;
this.name = name;
this.fields = fields;
this.descendings = descendings;
this.unique = unique;
}
/** {@inheritDoc} */
@Override public String name() {
return name;
}
/** {@inheritDoc} */
@Override public Collection<String> fields() {
return fields;
}
/** {@inheritDoc} */
@Override public boolean descending(String field) {
return descendings.contains(field);
}
/** {@inheritDoc} */
@Override public Collection<String> descendings() {
return descendings;
}
/** {@inheritDoc} */
@Override public boolean unique() {
return unique;
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, name);
U.writeCollection(out, fields);
U.writeCollection(out, descendings);
out.writeBoolean(unique);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
name = U.readString(in);
fields = U.readCollection(in);
descendings = U.readCollection(in);
unique = in.readBoolean();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheSqlIndexMetadata.class, this);
}
}
/**
*
*/
public static class QueryResult<K, V> extends CachedResult<IgniteBiTuple<K, V>> {
/** */
private final GridCacheQueryType type;
/** Future of query result metadata. Completed when query actually started. */
private final CompletableFuture<IndexQueryResultMeta> metadata;
/** Flag shows whether first result page was delivered to user. */
private volatile boolean sentFirst;
/**
* @param type Query type.
* @param rcpt ID of the recipient.
*/
private QueryResult(GridCacheQueryType type, Object rcpt) {
super(rcpt);
this.type = type;
metadata = type == INDEX ? new CompletableFuture<>() : null;
}
/**
* @return Type.
*/
public GridCacheQueryType type() {
return type;
}
/** */
public IndexQueryResultMeta metadata() {
if (sentFirst || metadata == null)
return null;
assert metadata.isDone() : "QueryResult metadata isn't completed yet.";
return metadata.getNow(null);
}
/** */
public void metadata(IndexQueryResultMeta metadata) {
if (this.metadata != null)
this.metadata.complete(metadata);
}
/** Callback to invoke, when next data page was delivered to user. */
public void onPageSend() {
sentFirst = true;
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable IgniteSpiCloseableIterator<IgniteBiTuple<K, V>> res, @Nullable Throwable err) {
boolean done = super.onDone(res, err);
if (done && err != null && metadata != null)
metadata.completeExceptionally(err);
return done;
}
}
/**
*
*/
private static class FieldsResult<Q> extends CachedResult<Q> {
/** */
private static final long serialVersionUID = 0L;
/** */
private List<GridQueryFieldMetadata> meta;
/**
* @param rcpt ID of the recipient.
*/
FieldsResult(Object rcpt) {
super(rcpt);
}
/**
* @return Metadata.
* @throws IgniteCheckedException On error.
*/
public List<GridQueryFieldMetadata> metaData() throws IgniteCheckedException {
get(); // Ensure that result is ready.
return meta;
}
/**
* @param meta Metadata.
*/
public void metaData(List<GridQueryFieldMetadata> meta) {
this.meta = meta;
}
}
/**
* Cached result.
*/
private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>> {
/** Absolute position of each recipient. */
private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1);
/** */
private CircularQueue<R> queue;
/** */
private int pruned;
/**
* @param rcpt ID of the recipient.
*/
protected CachedResult(Object rcpt) {
boolean res = addRecipient(rcpt);
assert res;
}
/**
* Close if this result does not have any other recipients.
*
* @param rcpt ID of the recipient.
* @throws IgniteCheckedException If failed.
*/
public void closeIfNotShared(Object rcpt) throws IgniteCheckedException {
assert isDone();
synchronized (recipients) {
if (recipients.isEmpty())
return;
recipients.remove(rcpt);
if (recipients.isEmpty() && error() == null)
get().close();
}
}
/**
* @param rcpt ID of the recipient.
* @return {@code true} If the recipient successfully added.
*/
public boolean addRecipient(Object rcpt) {
synchronized (recipients) {
if (isDone())
return false;
assert !recipients.containsKey(rcpt) : rcpt + " -> " + recipients;
recipients.put(rcpt, new QueueIterator(rcpt));
}
return true;
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable IgniteSpiCloseableIterator<R> res, @Nullable Throwable err) {
assert !isDone();
synchronized (recipients) {
if (recipients.size() > 1) {
queue = new CircularQueue<>(128);
for (QueueIterator it : recipients.values())
it.init();
}
return super.onDone(res, err);
}
}
/**
*
*/
private void pruneQueue() {
assert !recipients.isEmpty();
assert Thread.holdsLock(recipients);
int minPos = Collections.min(recipients.values()).pos;
if (minPos > pruned) {
queue.remove(minPos - pruned);
pruned = minPos;
}
}
/**
* @param rcpt ID of the recipient.
* @throws IgniteCheckedException If failed.
*/
public IgniteSpiCloseableIterator<R> iterator(Object rcpt) throws IgniteCheckedException {
assert rcpt != null;
IgniteSpiCloseableIterator<R> it = get();
assert it != null;
synchronized (recipients) {
return queue == null ? it : recipients.get(rcpt);
}
}
/**
*
*/
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
private class QueueIterator implements IgniteSpiCloseableIterator<R>, Comparable<QueueIterator> {
/** */
private static final long serialVersionUID = 0L;
/** */
private static final int NEXT_SIZE = 64;
/** */
private final Object rcpt;
/** */
private int pos;
/** */
private Queue<R> next;
/**
* @param rcpt ID of the recipient.
*/
private QueueIterator(Object rcpt) {
this.rcpt = rcpt;
}
/**
*
*/
public void init() {
assert next == null;
next = new ArrayDeque<>(NEXT_SIZE);
}
/** {@inheritDoc} */
@Override public void close() throws IgniteCheckedException {
closeIfNotShared(rcpt);
}
/** {@inheritDoc} */
@Override public boolean hasNext() {
return !next.isEmpty() || fillNext();
}
/** {@inheritDoc} */
@SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException") // It can actually.
@Override public R next() {
return next.remove();
}
/**
* @return {@code true} If elements were fetched into local queue of the iterator.
*/
private boolean fillNext() {
assert next.isEmpty();
IgniteSpiCloseableIterator<R> it;
try {
it = get();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
synchronized (recipients) {
for (int i = 0; i < NEXT_SIZE; i++) {
R res;
int off = pos - pruned; // Offset of current iterator relative to queue begin.
if (off == queue.size()) { // We are leading the race.
if (!it.hasNext())
break; // Happy end.
res = it.next();
queue.add(res);
}
else // Someone fetched result into queue before us.
res = queue.get(off);
assert res != null;
pos++;
next.add(res);
}
pruneQueue();
}
return !next.isEmpty();
}
/** {@inheritDoc} */
@Override public void remove() {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public int compareTo(QueueIterator o) {
return Integer.compare(pos, o.pos);
}
}
}
/**
* Queue.
*/
@SuppressWarnings("PackageVisibleInnerClass")
static class CircularQueue<R> {
/** */
private int off;
/** */
private int size;
/** */
private R[] arr;
/**
* @param cap Initial capacity.
*/
CircularQueue(int cap) {
assert U.isPow2(cap);
arr = (R[])new Object[cap];
}
/**
* @param o Object to add.
*/
public void add(R o) {
if (size == arr.length) { // Resize.
Object[] newArr = new Object[arr.length << 1];
int tailSize = arr.length - off;
System.arraycopy(arr, off, newArr, 0, tailSize);
if (off != 0) {
System.arraycopy(arr, 0, newArr, tailSize, off);
off = 0;
}
arr = (R[])newArr;
}
int idx = (off + size) & (arr.length - 1);
assert arr[idx] == null;
arr[idx] = o;
size++;
}
/**
* @param n Number of elements to remove.
*/
public void remove(int n) {
assert n > 0 : n;
assert n <= size : n + " " + size;
int mask = arr.length - 1;
for (int i = 0; i < n; i++) {
int idx = (off + i) & mask;
assert arr[idx] != null;
arr[idx] = null;
}
size -= n;
off += n;
if (off >= arr.length)
off -= arr.length;
}
/**
* @param idx Index in queue.
* @return Element at the given index.
*/
public R get(int idx) {
assert idx >= 0 : idx;
assert idx < size : idx + " " + size;
R res = arr[(idx + off) & (arr.length - 1)];
assert res != null;
return res;
}
/**
* @return Size.
*/
public int size() {
return size;
}
}
/**
* Query for {@link IndexingSpi}.
*
* @param keepBinary Keep binary flag.
* @return Query.
*/
public <R> CacheQuery<R> createSpiQuery(boolean keepBinary) {
return new GridCacheQueryAdapter<>(cctx,
SPI,
null,
null,
null,
null,
false,
keepBinary,
null);
}
/**
* Creates user's predicate based scan query.
*
* @param filter Scan filter.
* @param part Partition.
* @param keepBinary Keep binary flag.
* @param dataPageScanEnabled Flag to enable data page scan.
* @return Created query.
*/
public <R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
@Nullable Integer part, boolean keepBinary, Boolean dataPageScanEnabled) {
return createScanQuery(filter, null, part, keepBinary, false, dataPageScanEnabled);
}
/**
* Creates user's predicate based scan query.
*
* @param filter Scan filter.
* @param trans Transformer.
* @param part Partition.
* @param keepBinary Keep binary flag.
* @param forceLocal Flag to force local scan.
* @param dataPageScanEnabled Flag to enable data page scan.
* @return Created query.
*/
@SuppressWarnings("unchecked")
public <T, R> CacheQuery<R> createScanQuery(
@Nullable IgniteBiPredicate<K, V> filter,
@Nullable IgniteClosure<T, R> trans,
@Nullable Integer part,
boolean keepBinary,
boolean forceLocal,
Boolean dataPageScanEnabled
) {
return new GridCacheQueryAdapter(cctx,
SCAN,
filter,
trans,
part,
keepBinary,
forceLocal,
dataPageScanEnabled);
}
/**
* Creates user's full text query, queried class, and query clause. For more information refer to {@link CacheQuery}
* documentation.
*
* @param clsName Query class name.
* @param search Search clause.
* @param limit Limits response records count. If 0 or less, considered to be no limit.
* @param pageSize Query page size.
* @param keepBinary Keep binary flag.
* @return Created query.
*/
public CacheQuery<Map.Entry<K, V>> createFullTextQuery(String clsName,
String search, int limit, int pageSize, boolean keepBinary) {
A.notNull("clsName", clsName);
A.notNull("search", search);
return new GridCacheQueryAdapter<Map.Entry<K, V>>(cctx,
TEXT,
clsName,
search,
null,
null,
false,
keepBinary,
null)
.limit(limit)
.pageSize(pageSize);
}
/**
* Creates index query.
*
* @param qry User query.
* @param keepBinary Keep binary flag.
* @return Created query.
*/
public <R> CacheQuery<R> createIndexQuery(IndexQuery qry, boolean keepBinary) {
if (qry.getPartition() != null) {
int part = qry.getPartition();
A.ensure(part >= 0 && part < cctx.affinity().partitions(),
"Specified partition must be in the range [0, N) where N is partition number in the cache.");
}
IndexQueryDesc desc = new IndexQueryDesc(qry.getCriteria(), qry.getIndexName(), qry.getValueType());
GridCacheQueryAdapter q = new GridCacheQueryAdapter<>(
cctx, INDEX, desc, qry.getPartition(), qry.getValueType(), qry.getFilter());
q.keepBinary(keepBinary);
return q;
}
/** @return Query iterators. */
public ConcurrentMap<UUID, RequestFutureMap> queryIterators() {
return qryIters;
}
/** @return Local query iterators. */
public GridConcurrentHashSet<ScanQueryIterator> localQueryIterators() {
return locIters;
}
/**
* The map prevents put to the map in case the specified request has been removed previously.
*/
public class RequestFutureMap extends LinkedHashMap<Long, GridFutureAdapter<QueryResult<K, V>>> {
/** */
private static final long serialVersionUID = 0L;
/** Count of canceled keys */
private static final int CANCELED_COUNT = 128;
/**
* The ID of the canceled request is stored to the set in case remove(reqId) is called before put(reqId,
* future).
*/
private Set<Long> canceled;
/** {@inheritDoc} */
@Override public GridFutureAdapter<QueryResult<K, V>> remove(Object key) {
if (containsKey(key))
return super.remove(key);
else {
if (canceled == null) {
canceled = Collections.newSetFromMap(
new LinkedHashMap<Long, Boolean>() {
@Override protected boolean removeEldestEntry(Map.Entry<Long, Boolean> eldest) {
return size() > CANCELED_COUNT;
}
});
}
canceled.add((Long)key);
return null;
}
}
/**
* @return true if the key is canceled
*/
public boolean isCanceled(Long key) {
return canceled != null && canceled.contains(key);
}
}
/** */
public static final class ScanQueryIterator<K, V> extends GridCloseableIteratorAdapter<Object> {
/** */
private static final long serialVersionUID = 0L;
/** */
private final GridDhtCacheAdapter dht;
/** */
private final GridDhtLocalPartition locPart;
/** */
private final InternalScanFilter<K, V> intScanFilter;
/** */
private final boolean statsEnabled;
/** */
private final GridIterator<CacheDataRow> it;
/** */
private final GridCacheAdapter cache;
/** */
private final AffinityTopologyVersion topVer;
/** */
private final boolean keepBinary;
/** */
private final boolean readEvt;
/** */
private final String cacheName;
/** */
private final UUID subjId;
/** */
private final String taskName;
/** */
private final IgniteClosure transform;
/** */
private final CacheObjectContext objCtx;
/** */
private final GridCacheContext cctx;
/** */
private final IgniteLogger log;
/** */
private Object next;
/** */
private boolean needAdvance;
/** */
private IgniteCacheExpiryPolicy expiryPlc;
/** */
private final boolean locNode;
/** */
private final boolean incBackups;
/** */
private final long startTime;
/** */
private final int pageSize;
/** */
@Nullable private final GridConcurrentHashSet<ScanQueryIterator> locIters;
/**
* @param it Iterator.
* @param qry Query.
* @param topVer Topology version.
* @param locPart Local partition.
* @param scanFilter Scan filter.
* @param transformer Transformer.
* @param locNode Local node flag.
* @param locIters Local iterators set.
* @param cctx Cache context.
* @param log Logger.
*/
ScanQueryIterator(
GridIterator<CacheDataRow> it,
GridCacheQueryAdapter qry,
AffinityTopologyVersion topVer,
GridDhtLocalPartition locPart,
IgniteBiPredicate<K, V> scanFilter,
IgniteClosure transformer,
boolean locNode,
@Nullable GridConcurrentHashSet<ScanQueryIterator> locIters,
GridCacheContext cctx,
IgniteLogger log) {
assert !locNode || locIters != null : "Local iterators can't be null for local query.";
this.it = it;
this.topVer = topVer;
this.locPart = locPart;
this.intScanFilter = scanFilter != null ? new InternalScanFilter<>(scanFilter) : null;
this.cctx = cctx;
this.log = log;
this.locNode = locNode;
this.locIters = locIters;
incBackups = qry.includeBackups();
statsEnabled = cctx.statisticsEnabled();
readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ) &&
cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ);
taskName = readEvt ? cctx.kernalContext().task().resolveTaskName(qry.taskHash()) : null;
subjId = securitySubjectId(cctx);
// keep binary for remote scans if possible
keepBinary = (!locNode && scanFilter == null && transformer == null && !readEvt) || qry.keepBinary();
transform = transformer;
dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
cache = dht != null ? dht : cctx.cache();
objCtx = cctx.cacheObjectContext();
cacheName = cctx.name();
needAdvance = true;
expiryPlc = this.cctx.cache().expiryPolicy(null);
startTime = U.currentTimeMillis();
pageSize = qry.pageSize();
}
/** {@inheritDoc} */
@Override protected Object onNext() {
if (needAdvance)
advance();
else
needAdvance = true;
if (next == null)
throw new NoSuchElementException();
return next;
}
/** {@inheritDoc} */
@Override protected boolean onHasNext() {
if (needAdvance) {
advance();
needAdvance = false;
}
return next != null;
}
/** {@inheritDoc} */
@Override protected void onClose() {
if (expiryPlc != null && dht != null) {
dht.sendTtlUpdateRequest(expiryPlc);
expiryPlc = null;
}
if (locPart != null)
locPart.release();
if (intScanFilter != null)
intScanFilter.close();
if (locIters != null)
locIters.remove(this);
}
/**
* Moves the iterator to the next cache entry.
*/
private void advance() {
long start = statsEnabled ? System.nanoTime() : 0L;
Object next0 = null;
while (it.hasNext()) {
CacheDataRow row = it.next();
KeyCacheObject key = row.key();
CacheObject val;
if (expiryPlc != null) {
try {
CacheDataRow tmp = row;
while (true) {
try {
GridCacheEntryEx entry = cache.entryEx(key);
entry.unswap(tmp);
val = entry.peek(true, true, topVer, expiryPlc);
entry.touch();
break;
}
catch (GridCacheEntryRemovedException ignore) {
tmp = null;
}
}
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to peek value: " + e);
val = null;
}
if (dht != null && expiryPlc.readyToFlush(100))
dht.sendTtlUpdateRequest(expiryPlc);
}
else
val = row.value();
// Filter backups for SCAN queries, if it isn't partition scan.
// Other types are filtered in indexing manager.
if (!cctx.isReplicated() && /*qry.partition()*/this.locPart == null && !incBackups &&
!cctx.affinity().primaryByKey(cctx.localNode(), key, topVer)) {
if (log.isDebugEnabled())
log.debug("Ignoring backup element [row=" + row +
", cacheMode=" + cctx.config().getCacheMode() + ", incBackups=" + incBackups +
", primary=" + cctx.affinity().primaryByKey(cctx.localNode(), key, topVer) + ']');
continue;
}
if (log.isDebugEnabled()) {
ClusterNode primaryNode = cctx.affinity().primaryByKey(key,
cctx.affinity().affinityTopologyVersion());
log.debug(S.toString("Record",
"key", key, true,
"val", val, true,
"incBackups", incBackups, false,
"priNode", primaryNode != null ? U.id8(primaryNode.id()) : null, false,
"node", U.id8(cctx.localNode().id()), false));
}
if (val != null) {
K key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, keepBinary, false);
V val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, keepBinary, false);
if (statsEnabled) {
CacheMetricsImpl metrics = cctx.cache().metrics0();
metrics.onRead(true);
metrics.addGetTimeNanos(System.nanoTime() - start);
}
if (intScanFilter == null || intScanFilter.apply(key0, val0)) {
if (readEvt) {
cctx.gridEvents().record(new CacheQueryReadEvent<>(
cctx.localNode(),
"Scan query entry read.",
EVT_CACHE_QUERY_OBJECT_READ,
CacheQueryType.SCAN.name(),
cacheName,
null,
null,
intScanFilter != null ? intScanFilter.scanFilter() : null,
null,
null,
subjId,
taskName,
key0,
val0,
null,
null));
}
if (transform != null) {
try {
next0 = transform.apply(new CacheQueryEntry<>(key0, val0));
}
catch (Throwable e) {
throw new IgniteException(e);
}
}
else
next0 = !locNode ? new T2<>(key0, val0) :
new CacheQueryEntry<>(key0, val0);
break;
}
}
}
if ((this.next = next0) == null && expiryPlc != null && dht != null) {
dht.sendTtlUpdateRequest(expiryPlc);
expiryPlc = null;
}
}
/** */
@Nullable public IgniteBiPredicate<K, V> filter() {
return intScanFilter == null ? null : intScanFilter.scanFilter;
}
/** */
public AffinityTopologyVersion topVer() {
return topVer;
}
/** */
public GridDhtLocalPartition localPartition() {
return locPart;
}
/** */
public IgniteClosure transformer() {
return transform;
}
/** */
public long startTime() {
return startTime;
}
/** */
public boolean local() {
return locNode;
}
/** */
public boolean keepBinary() {
return keepBinary;
}
/** */
public UUID subjectId() {
return subjId;
}
/** */
public String taskName() {
return taskName;
}
/** */
public GridCacheContext cacheContext() {
return cctx;
}
/** */
public int pageSize() {
return pageSize;
}
}
/**
* Wrap scan filter in order to catch unhandled errors.
*/
private static class InternalScanFilter<K, V> implements IgniteBiPredicate<K, V> {
/** */
private static final long serialVersionUID = 0L;
/** */
private final IgniteBiPredicate<K, V> scanFilter;
/**
* @param scanFilter User scan filter.
*/
InternalScanFilter(IgniteBiPredicate<K, V> scanFilter) {
this.scanFilter = scanFilter;
}
/** {@inheritDoc} */
@Override public boolean apply(K k, V v) {
try {
return scanFilter == null || scanFilter.apply(k, v);
}
catch (Throwable e) {
throw new IgniteException(e);
}
}
/** */
void close() {
if (scanFilter instanceof PlatformCacheEntryFilter)
((PlatformCacheEntryFilter)scanFilter).onClose();
}
/**
* @return Wrapped scan filter.
*/
IgniteBiPredicate<K, V> scanFilter() {
return scanFilter;
}
}
}