blob: 5afe9e97155325f5fe09711f9959f7db8754db49 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2;
import java.sql.BatchUpdateException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheServerNotFoundException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cache.query.index.IndexName;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings;
import org.apache.ignite.internal.cache.query.index.sorted.QueryIndexDefinition;
import org.apache.ignite.internal.cache.query.index.sorted.client.ClientIndexDefinition;
import org.apache.ignite.internal.cache.query.index.sorted.client.ClientIndexFactory;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexFactory;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexImpl;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.managers.IgniteMBeansManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.mvcc.StaticMvccQueryTracker;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcParameterMeta;
import org.apache.ignite.internal.processors.query.ColumnInformation;
import org.apache.ignite.internal.processors.query.EnlistOperation;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.RunningQueryManager;
import org.apache.ignite.internal.processors.query.SqlClientContext;
import org.apache.ignite.internal.processors.query.TableInformation;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
import org.apache.ignite.internal.processors.query.h2.affinity.H2PartitionResolver;
import org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeClientIndex;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndexBase;
import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateResultsIterator;
import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateSingleEntryIterator;
import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
import org.apache.ignite.internal.processors.query.h2.maintenance.RebuildIndexWorkflowCallback;
import org.apache.ignite.internal.processors.query.h2.mxbean.SqlQueryMXBean;
import org.apache.ignite.internal.processors.query.h2.mxbean.SqlQueryMXBeanImpl;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.stat.IgniteStatisticsManager;
import org.apache.ignite.internal.processors.query.stat.IgniteStatisticsManagerImpl;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.sql.command.SqlCommand;
import org.apache.ignite.internal.sql.command.SqlCommitTransactionCommand;
import org.apache.ignite.internal.sql.command.SqlRollbackTransactionCommand;
import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.h2.api.ErrorCode;
import org.h2.api.JavaObjectSerializer;
import org.h2.engine.Session;
import org.h2.engine.SysProperties;
import org.h2.index.Index;
import org.h2.index.IndexType;
import org.h2.message.DbException;
import org.h2.table.Column;
import org.h2.table.IndexColumn;
import org.h2.table.TableType;
import org.h2.util.JdbcUtils;
import org.h2.value.CompareMode;
import org.h2.value.DataType;
import org.jetbrains.annotations.Nullable;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.Collections.singletonList;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD;
import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
import static org.apache.ignite.internal.cache.query.index.sorted.maintenance.MaintenanceRebuildIndexUtils.INDEX_REBUILD_MNTC_TASK_NAME;
import static org.apache.ignite.internal.cache.query.index.sorted.maintenance.MaintenanceRebuildIndexUtils.parseMaintenanceTaskParameters;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager.TX_SIZE_THRESHOLD;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.requestSnapshot;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
import static org.apache.ignite.internal.processors.query.QueryUtils.matches;
import static org.apache.ignite.internal.processors.query.h2.H2Utils.UPDATE_RESULT_META;
import static org.apache.ignite.internal.processors.query.h2.H2Utils.generateFieldsQueryString;
import static org.apache.ignite.internal.processors.query.h2.H2Utils.session;
import static org.apache.ignite.internal.processors.query.h2.H2Utils.validateTypeDescriptor;
import static org.apache.ignite.internal.processors.query.h2.H2Utils.zeroCursor;
import static org.apache.ignite.internal.processors.tracing.SpanTags.ERROR;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_QRY_TEXT;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_SCHEMA;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CMD_QRY_EXECUTE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CURSOR_OPEN;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_DML_QRY_EXECUTE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_ITER_OPEN;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY_EXECUTE;
/**
* Indexing implementation based on H2 database engine. In this implementation main query language is SQL,
* fulltext indexing can be performed using Lucene.
* <p>
* For each registered {@link GridQueryTypeDescriptor} this SPI will create respective SQL table with
* {@code '_key'} and {@code '_val'} fields for key and value, and fields from
* {@link GridQueryTypeDescriptor#fields()}.
* For each table it will create indexes declared in {@link GridQueryTypeDescriptor#indexes()}.
*/
public class IgniteH2Indexing implements GridQueryIndexing {
/** Default number of attempts to re-run DELETE and UPDATE queries in case of concurrent modifications of values. */
private static final int DFLT_UPDATE_RERUN_ATTEMPTS = 4;
/** Cached value of {@code IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION}. */
private final boolean updateInTxAllowed =
Boolean.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION);
/** Make it public for test purposes. */
public static InlineIndexFactory idxFactory = InlineIndexFactory.INSTANCE;
/** Logger. */
@LoggerResource
private IgniteLogger log;
/** Node ID. */
private UUID nodeId;
/** */
private Marshaller marshaller;
/** */
private GridMapQueryExecutor mapQryExec;
/** */
private GridReduceQueryExecutor rdcQryExec;
/** */
private GridSpinBusyLock busyLock;
/** */
protected volatile GridKernalContext ctx;
/** Query context registry. */
private final QueryContextRegistry qryCtxRegistry = new QueryContextRegistry();
/** Processor to execute commands which are neither SELECT, nor DML. */
private CommandProcessor cmdProc;
/** Partition reservation manager. */
private PartitionReservationManager partReservationMgr;
/** Partition extractor. */
private PartitionExtractor partExtractor;
/** Parser. */
private QueryParser parser;
/** */
private final IgniteInClosure<? super IgniteInternalFuture<?>> logger = new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
fut.get();
}
catch (IgniteCheckedException e) {
U.error(log, e.getMessage(), e);
}
}
};
/** Query executor. */
private ConnectionManager connMgr;
/** Schema manager. */
private SchemaManager schemaMgr;
/** H2 Connection manager. */
private LongRunningQueryManager longRunningQryMgr;
/** Discovery event listener. */
private GridLocalEventListener discoLsnr;
/** Query message listener. */
private GridMessageListener qryLsnr;
/** Statistic manager. */
private IgniteStatisticsManager statsMgr;
/** Distributed config. */
private DistributedSqlConfiguration distrCfg;
/** Functions manager. */
private FunctionsManager funcMgr;
/**
* @return Kernal context.
*/
public GridKernalContext kernalContext() {
return ctx;
}
/** {@inheritDoc} */
@Override public List<JdbcParameterMeta> parameterMetaData(String schemaName, SqlFieldsQuery qry)
throws IgniteSQLException {
assert qry != null;
ArrayList<JdbcParameterMeta> metas = new ArrayList<>();
SqlFieldsQuery curQry = qry;
while (curQry != null) {
QueryParserResult parsed = parser.parse(schemaName, curQry, true);
metas.addAll(parsed.parametersMeta());
curQry = parsed.remainingQuery();
}
return metas;
}
/** {@inheritDoc} */
@Override public List<GridQueryFieldMetadata> resultMetaData(String schemaName, SqlFieldsQuery qry)
throws IgniteSQLException {
QueryParserResult parsed = parser.parse(schemaName, qry, true);
if (parsed.remainingQuery() != null)
return null;
if (parsed.isSelect())
return parsed.select().meta();
return null;
}
/** {@inheritDoc} */
@Override public void store(GridCacheContext cctx,
GridQueryTypeDescriptor type,
CacheDataRow row,
@Nullable CacheDataRow prevRow,
boolean prevRowAvailable
) throws IgniteCheckedException {
String cacheName = cctx.name();
H2TableDescriptor tbl = schemaMgr.tableForType(schema(cacheName), cacheName, type.name());
if (tbl == null)
return; // Type was rejected.
if (tbl.luceneIndex() != null) {
long expireTime = row.expireTime();
if (expireTime == 0L)
expireTime = Long.MAX_VALUE;
tbl.luceneIndex().store(row.key(), row.value(), row.version(), expireTime);
}
tbl.table().update(row, prevRow);
}
/** {@inheritDoc} */
@Override public void remove(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row)
throws IgniteCheckedException {
if (log.isDebugEnabled()) {
log.debug("Removing key from cache query index [locId=" + nodeId +
", key=" + row.key() +
", val=" + row.value() + ']');
}
String cacheName = cctx.name();
H2TableDescriptor tbl = schemaMgr.tableForType(schema(cacheName), cacheName, type.name());
if (tbl == null)
return;
if (tbl.luceneIndex() != null)
tbl.luceneIndex().remove(row.key());
tbl.table().remove(row);
}
/** {@inheritDoc} */
@Override public void dynamicIndexCreate(String schemaName, String tblName, QueryIndexDescriptorImpl idxDesc,
boolean ifNotExists, SchemaIndexCacheVisitor cacheVisitor) throws IgniteCheckedException {
schemaMgr.createIndex(schemaName, tblName, idxDesc, ifNotExists, cacheVisitor);
}
/** {@inheritDoc} */
@Override public void dynamicIndexDrop(String schemaName, String idxName, boolean ifExists)
throws IgniteCheckedException {
schemaMgr.dropIndex(schemaName, idxName, ifExists);
}
/** {@inheritDoc} */
@Override public void dynamicAddColumn(String schemaName, String tblName, List<QueryField> cols,
boolean ifTblExists, boolean ifColNotExists) throws IgniteCheckedException {
schemaMgr.addColumn(schemaName, tblName, cols, ifTblExists, ifColNotExists);
clearPlanCache();
}
/** {@inheritDoc} */
@Override public void dynamicDropColumn(String schemaName, String tblName, List<String> cols, boolean ifTblExists,
boolean ifColExists) throws IgniteCheckedException {
schemaMgr.dropColumn(schemaName, tblName, cols, ifTblExists, ifColExists);
clearPlanCache();
}
/**
* Create sorted index.
*
* @param name Index name,
* @param tbl Table.
* @param pk Primary key flag.
* @param affinityKey Affinity key flag.
* @param unwrappedCols Unwrapped index columns for complex types.
* @param wrappedCols Index columns as is complex types.
* @param inlineSize Index inline size.
* @param cacheVisitor whether index created with new cache or on existing one.
* @return Index.
*/
@SuppressWarnings("ConstantConditions")
GridH2IndexBase createSortedIndex(String name, GridH2Table tbl, boolean pk, boolean affinityKey,
List<IndexColumn> unwrappedCols, List<IndexColumn> wrappedCols, int inlineSize, @Nullable SchemaIndexCacheVisitor cacheVisitor) {
GridCacheContextInfo cacheInfo = tbl.cacheInfo();
if (log.isDebugEnabled())
log.debug("Creating cache index [cacheId=" + cacheInfo.cacheId() + ", idxName=" + name + ']');
if (cacheInfo.affinityNode()) {
GridCacheContext<?, ?> cctx = cacheInfo.cacheContext();
GridQueryTypeDescriptor typeDesc = tbl.rowDescriptor().type();
int typeId = cctx.binaryMarshaller() ? typeDesc.typeId() : typeDesc.valueClass().hashCode();
String treeName = BPlusTree.treeName(typeId + "_" + name, "H2Tree");
List<IndexColumn> cols = ctx.indexProcessor().useUnwrappedPk(cctx, treeName) ? unwrappedCols : wrappedCols;
IndexKeyTypeSettings keyTypeSettings = new IndexKeyTypeSettings()
.stringOptimizedCompare(CompareMode.OFF.equals(tbl.getCompareMode().getName()))
.binaryUnsigned(tbl.getCompareMode().isBinaryUnsigned());
QueryIndexDefinition idxDef = new QueryIndexDefinition(
tbl.rowDescriptor(),
new IndexName(tbl.cacheName(), tbl.getSchema().getName(), tbl.getName(), name),
treeName,
ctx.indexProcessor().rowCacheCleaner(cacheInfo.groupId()),
pk,
affinityKey,
H2Utils.columnsToKeyDefinitions(tbl, cols),
inlineSize,
keyTypeSettings
);
org.apache.ignite.internal.cache.query.index.Index index;
if (cacheVisitor != null)
index = ctx.indexProcessor().createIndexDynamically(
tbl.cacheContext(), idxFactory, idxDef, cacheVisitor);
else
index = ctx.indexProcessor().createIndex(tbl.cacheContext(), idxFactory, idxDef);
InlineIndexImpl queryIndex = index.unwrap(InlineIndexImpl.class);
return new H2TreeIndex(queryIndex, tbl, unwrappedCols.toArray(new IndexColumn[0]), pk, log);
}
else {
ClientIndexDefinition d = new ClientIndexDefinition(
new IndexName(tbl.cacheName(), tbl.getSchema().getName(), tbl.getName(), name),
H2Utils.columnsToKeyDefinitions(tbl, unwrappedCols),
inlineSize,
tbl.cacheInfo().config().getSqlIndexMaxInlineSize());
org.apache.ignite.internal.cache.query.index.Index index =
ctx.indexProcessor().createIndex(tbl.cacheContext(), new ClientIndexFactory(log), d);
InlineIndex idx = index.unwrap(InlineIndex.class);
IndexType idxType = pk ? IndexType.createPrimaryKey(false, false) :
IndexType.createNonUnique(false, false, false);
return new H2TreeClientIndex(idx, tbl, name, unwrappedCols.toArray(new IndexColumn[0]), idxType);
}
}
/** {@inheritDoc} */
@Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String schemaName,
String cacheName, String qry, String typeName, IndexingQueryFilter filters, int limit) throws IgniteCheckedException {
H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, typeName);
if (tbl != null && tbl.luceneIndex() != null) {
long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null, null);
try {
return tbl.luceneIndex().query(qry.toUpperCase(), filters, limit);
}
finally {
runningQueryManager().unregister(qryId, null);
}
}
return new GridEmptyCloseableIterator<>();
}
/**
* Queries individual fields (generally used by JDBC drivers).
*
* @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param select Select.
* @param filter Cache name and key filter.
* @param mvccTracker Query tracker.
* @param cancel Query cancel.
* @param inTx Flag whether the query is executed in transaction.
* @param timeout Timeout.
* @return Query result.
*/
private GridQueryFieldsResult executeSelectLocal(
long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultSelect select,
final IndexingQueryFilter filter,
MvccQueryTracker mvccTracker,
GridQueryCancel cancel,
boolean inTx,
int timeout
) {
assert !select.mvccEnabled() || mvccTracker != null;
String qry;
if (select.forUpdate())
qry = inTx ? select.forUpdateQueryTx() : select.forUpdateQueryOutTx();
else
qry = qryDesc.sql();
boolean mvccEnabled = mvccTracker != null;
try {
assert select != null;
if (ctx.security().enabled())
checkSecurity(select.cacheIds());
MvccSnapshot mvccSnapshot = null;
if (mvccEnabled)
mvccSnapshot = mvccTracker.snapshot();
final QueryContext qctx = new QueryContext(
0,
filter,
null,
mvccSnapshot,
null,
true
);
return new GridQueryFieldsResultAdapter(select.meta(), null) {
@Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException {
H2PooledConnection conn = connections().connection(qryDesc.schemaName());
try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) {
H2Utils.setupConnection(conn, qctx,
qryDesc.distributedJoins(), qryDesc.enforceJoinOrder(), qryParams.lazy());
PreparedStatement stmt = conn.prepareStatement(qry, H2StatementCache.queryFlags(qryDesc));
// Convert parameters into BinaryObjects.
Marshaller m = ctx.config().getMarshaller();
byte[] paramsBytes = U.marshal(m, qryParams.arguments());
final ClassLoader ldr = U.resolveClassLoader(ctx.config());
Object[] params;
if (m instanceof BinaryMarshaller) {
params = BinaryUtils.rawArrayFromBinary(((BinaryMarshaller)m).binaryMarshaller()
.unmarshal(paramsBytes, ldr));
}
else
params = U.unmarshal(m, paramsBytes, ldr);
H2Utils.bindParameters(stmt, F.asList(params));
H2QueryInfo qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry,
ctx.localNodeId(), qryId);
ResultSet rs = executeSqlQueryWithTimer(
stmt,
conn,
qry,
timeout,
cancel,
qryParams.dataPageScanEnabled(),
qryInfo
);
return new H2FieldsIterator(
rs,
mvccTracker,
conn,
qryParams.pageSize(),
log,
IgniteH2Indexing.this,
qryInfo,
ctx.tracing()
);
}
catch (IgniteCheckedException | RuntimeException | Error e) {
conn.close();
try {
if (mvccTracker != null)
mvccTracker.onDone();
}
catch (Exception e0) {
e.addSuppressed(e0);
}
throw e;
}
}
};
}
catch (Exception e) {
GridNearTxLocal tx = null;
if (mvccEnabled && (tx != null || (tx = tx(ctx)) != null))
tx.setRollbackOnly();
throw e;
}
}
/**
* @param qryTimeout Query timeout in milliseconds.
* @param tx Transaction.
* @return Timeout for operation in milliseconds based on query and tx timeouts.
*/
public static int operationTimeout(int qryTimeout, IgniteTxAdapter tx) {
if (tx != null) {
int remaining = (int)tx.remainingTime();
return remaining > 0 && qryTimeout > 0 ? min(remaining, qryTimeout) : max(remaining, qryTimeout);
}
return qryTimeout;
}
/** {@inheritDoc} */
@Override public long streamUpdateQuery(
String schemaName,
String qry,
@Nullable Object[] params,
IgniteDataStreamer<?, ?> streamer,
String qryInitiatorId
) throws IgniteCheckedException {
QueryParserResultDml dml = streamerParse(schemaName, qry);
return streamQuery0(qry, schemaName, streamer, dml, params, qryInitiatorId);
}
/** {@inheritDoc} */
@SuppressWarnings({"ForLoopReplaceableByForEach", "ConstantConditions"})
@Override public List<Long> streamBatchedUpdateQuery(
String schemaName,
String qry,
List<Object[]> params,
SqlClientContext cliCtx,
String qryInitiatorId
) throws IgniteCheckedException {
if (cliCtx == null || !cliCtx.isStream()) {
U.warn(log, "Connection is not in streaming mode.");
return zeroBatchedStreamedUpdateResult(params.size());
}
QueryParserResultDml dml = streamerParse(schemaName, qry);
IgniteDataStreamer<?, ?> streamer = cliCtx.streamerForCache(dml.streamTable().cacheName());
assert streamer != null;
List<Long> ress = new ArrayList<>(params.size());
for (int i = 0; i < params.size(); i++) {
long res = streamQuery0(qry, schemaName, streamer, dml, params.get(i), qryInitiatorId);
ress.add(res);
}
return ress;
}
/**
* Perform given statement against given data streamer. Only rows based INSERT is supported.
*
* @param qry Query.
* @param schemaName Schema name.
* @param streamer Streamer to feed data to.
* @param dml DML statement.
* @param args Statement arguments.
* @return Number of rows in given INSERT statement.
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings({"unchecked"})
private long streamQuery0(String qry, String schemaName, IgniteDataStreamer streamer, QueryParserResultDml dml,
final Object[] args, String qryInitiatorId) throws IgniteCheckedException {
long qryId = runningQueryManager().register(
QueryUtils.INCLUDE_SENSITIVE ? qry : sqlWithoutConst(dml.statement()),
GridCacheQueryType.SQL_FIELDS,
schemaName,
true,
null,
qryInitiatorId
);
Exception failReason = null;
try {
UpdatePlan plan = dml.plan();
Iterator<List<?>> iter = new GridQueryCacheObjectsIterator(updateQueryRows(qryId, schemaName, plan, args),
objectContext(), true);
if (!iter.hasNext())
return 0;
IgniteBiTuple<?, ?> t = plan.processRow(iter.next());
if (!iter.hasNext()) {
streamer.addData(t.getKey(), t.getValue());
return 1;
}
else {
Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
rows.put(t.getKey(), t.getValue());
while (iter.hasNext()) {
List<?> row = iter.next();
t = plan.processRow(row);
rows.put(t.getKey(), t.getValue());
}
streamer.addData(rows);
return rows.size();
}
}
catch (IgniteException | IgniteCheckedException e) {
failReason = e;
throw e;
}
finally {
runningQueryManager().unregister(qryId, failReason);
}
}
/**
* Calculates rows for update query.
*
* @param qryId Query id.
* @param schemaName Schema name.
* @param plan Update plan.
* @param args Statement arguments.
* @return Rows for update.
* @throws IgniteCheckedException If failed.
*/
private Iterator<List<?>> updateQueryRows(long qryId, String schemaName, UpdatePlan plan, Object[] args)
throws IgniteCheckedException {
Object[] params = args != null ? args : X.EMPTY_OBJECT_ARRAY;
if (!F.isEmpty(plan.selectQuery())) {
SqlFieldsQuery selectQry = new SqlFieldsQuery(plan.selectQuery())
.setArgs(params)
.setLocal(true);
QueryParserResult selectParseRes = parser.parse(schemaName, selectQry, false);
GridQueryFieldsResult res = executeSelectLocal(
qryId,
selectParseRes.queryDescriptor(),
selectParseRes.queryParameters(),
selectParseRes.select(),
null,
null,
null,
false,
0
);
return res.iterator();
}
else
return plan.createRows(params).iterator();
}
/**
* Parse statement for streamer.
*
* @param schemaName Schema name.
* @param qry Query.
* @return DML.
*/
private QueryParserResultDml streamerParse(String schemaName, String qry) {
QueryParserResult parseRes = parser.parse(schemaName, new SqlFieldsQuery(qry), false);
QueryParserResultDml dml = parseRes.dml();
if (dml == null || !dml.streamable()) {
throw new IgniteSQLException("Streaming mode supports only INSERT commands without subqueries.",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
return dml;
}
/**
* @param size Result size.
* @return List of given size filled with 0Ls.
*/
private static List<Long> zeroBatchedStreamedUpdateResult(int size) {
Long[] res = new Long[size];
Arrays.fill(res, 0L);
return Arrays.asList(res);
}
/**
* Executes sql query statement.
*
* @param conn Connection,.
* @param stmt Statement.
* @param timeoutMillis Query timeout.
* @param cancel Query cancel.
* @return Result.
* @throws IgniteCheckedException If failed.
*/
private ResultSet executeSqlQuery(final H2PooledConnection conn, final PreparedStatement stmt,
int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
if (cancel != null)
cancel.add(() -> cancelStatement(stmt));
Session ses = session(conn);
if (timeoutMillis >= 0)
ses.setQueryTimeout(timeoutMillis);
else
ses.setQueryTimeout((int)distrCfg.defaultQueryTimeout());
try {
return stmt.executeQuery();
}
catch (SQLException e) {
// Throw special exception.
if (e.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
throw new QueryCancelledException();
if (e.getCause() instanceof IgniteSQLException)
throw (IgniteSQLException)e.getCause();
throw new IgniteSQLException(e);
}
}
/**
* Cancel prepared statement.
*
* @param stmt Statement.
*/
private static void cancelStatement(PreparedStatement stmt) {
try {
stmt.cancel();
}
catch (SQLException ignored) {
// No-op.
}
}
/**
* Executes sql query and prints warning if query is too slow..
*
* @param conn Connection,
* @param sql Sql query.
* @param params Parameters.
* @param timeoutMillis Query timeout.
* @param cancel Query cancel.
* @param dataPageScanEnabled If data page scan is enabled.
* @return Result.
* @throws IgniteCheckedException If failed.
*/
public ResultSet executeSqlQueryWithTimer(
H2PooledConnection conn,
String sql,
@Nullable Collection<Object> params,
int timeoutMillis,
@Nullable GridQueryCancel cancel,
Boolean dataPageScanEnabled,
final H2QueryInfo qryInfo
) throws IgniteCheckedException {
PreparedStatement stmt = conn.prepareStatementNoCache(sql);
H2Utils.bindParameters(stmt, params);
return executeSqlQueryWithTimer(stmt,
conn, sql, timeoutMillis, cancel, dataPageScanEnabled, qryInfo);
}
/**
* @param dataPageScanEnabled If data page scan is enabled.
*/
public void enableDataPageScan(Boolean dataPageScanEnabled) {
// Data page scan is enabled by default for SQL.
// TODO https://issues.apache.org/jira/browse/IGNITE-11998
CacheDataTree.setDataPageScanEnabled(false);
}
/**
* Executes sql query and prints warning if query is too slow.
*
* @param stmt Prepared statement for query.
* @param conn Connection.
* @param sql Sql query.
* @param timeoutMillis Query timeout.
* @param cancel Query cancel.
* @param dataPageScanEnabled If data page scan is enabled.
* @return Result.
* @throws IgniteCheckedException If failed.
*/
public ResultSet executeSqlQueryWithTimer(
PreparedStatement stmt,
H2PooledConnection conn,
String sql,
int timeoutMillis,
@Nullable GridQueryCancel cancel,
Boolean dataPageScanEnabled,
final H2QueryInfo qryInfo
) throws IgniteCheckedException {
if (qryInfo != null)
longRunningQryMgr.registerQuery(qryInfo);
enableDataPageScan(dataPageScanEnabled);
try (
TraceSurroundings ignored = MTC.support(ctx.tracing()
.create(SQL_QRY_EXECUTE, MTC.span())
.addTag(SQL_QRY_TEXT, () -> sql))
) {
ResultSet rs = executeSqlQuery(conn, stmt, timeoutMillis, cancel);
if (qryInfo != null && qryInfo.time() > longRunningQryMgr.getTimeout())
qryInfo.printLogMessage(log, "Long running query is finished", null);
return rs;
}
catch (Throwable e) {
if (qryInfo != null && qryInfo.time() > longRunningQryMgr.getTimeout()) {
qryInfo.printLogMessage(log, "Long running query is finished with error: "
+ e.getMessage(), null);
}
throw e;
}
finally {
CacheDataTree.setDataPageScanEnabled(false);
if (qryInfo != null)
longRunningQryMgr.unregisterQuery(qryInfo);
}
}
/** {@inheritDoc} */
@SuppressWarnings("deprecation")
@Override public SqlFieldsQuery generateFieldsQuery(String cacheName, SqlQuery qry) {
String schemaName = schema(cacheName);
String type = qry.getType();
H2TableDescriptor tblDesc = schemaMgr.tableForType(schemaName, cacheName, type);
if (tblDesc == null)
throw new IgniteSQLException("Failed to find SQL table for type: " + type,
IgniteQueryErrorCode.TABLE_NOT_FOUND);
String sql;
try {
sql = generateFieldsQueryString(qry.getSql(), qry.getAlias(), tblDesc);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
SqlFieldsQuery res = QueryUtils.withQueryTimeout(new SqlFieldsQuery(sql), qry.getTimeout(), TimeUnit.MILLISECONDS);
res.setArgs(qry.getArgs());
res.setDistributedJoins(qry.isDistributedJoins());
res.setLocal(qry.isLocal());
res.setPageSize(qry.getPageSize());
res.setPartitions(qry.getPartitions());
res.setReplicatedOnly(qry.isReplicatedOnly());
res.setSchema(schemaName);
res.setSql(sql);
return res;
}
/**
* Execute command.
*
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param cliCtx CLient context.
* @param cmd Command (native).
* @return Result.
*/
private FieldsQueryCursor<List<?>> executeCommand(
QueryDescriptor qryDesc,
QueryParameters qryParams,
@Nullable SqlClientContext cliCtx,
QueryParserResultCommand cmd
) {
if (cmd.noOp())
return zeroCursor();
SqlCommand cmdNative = cmd.commandNative();
GridSqlStatement cmdH2 = cmd.commandH2();
if (qryDesc.local()) {
throw new IgniteSQLException("DDL statements are not supported for execution on local nodes only",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
long qryId = registerRunningQuery(qryDesc, qryParams, null, null);
CommandResult res = null;
Exception failReason = null;
try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_CMD_QRY_EXECUTE, MTC.span()))) {
res = cmdProc.runCommand(qryDesc.sql(), cmdNative, cmdH2, qryParams, cliCtx, qryId);
return res.cursor();
}
catch (IgniteException e) {
failReason = e;
throw e;
}
catch (IgniteCheckedException e) {
failReason = e;
throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qryDesc.sql() +
", err=" + e.getMessage() + ']', e);
}
finally {
if (res == null || res.unregisterRunningQuery())
runningQueryManager().unregister(qryId, failReason);
}
}
/**
* Check whether command could be executed with the given cluster state.
*
* @param parseRes Parsing result.
*/
private void checkClusterState(QueryParserResult parseRes) {
if (!ctx.state().publicApiActiveState(true)) {
if (parseRes.isCommand()) {
QueryParserResultCommand cmd = parseRes.command();
assert cmd != null;
SqlCommand cmd0 = cmd.commandNative();
if (cmd0 instanceof SqlCommitTransactionCommand || cmd0 instanceof SqlRollbackTransactionCommand)
return;
}
throw new IgniteException("Can not perform the operation because the cluster is inactive. Note, " +
"that the cluster is considered inactive by default if Ignite Persistent Store is used to " +
"let all the nodes join the cluster. To activate the cluster call Ignite.active(true).");
}
}
/** {@inheritDoc} */
@SuppressWarnings({"StringEquality"})
@Override public List<FieldsQueryCursor<List<?>>> querySqlFields(
String schemaName,
SqlFieldsQuery qry,
@Nullable SqlClientContext cliCtx,
boolean keepBinary,
boolean failOnMultipleStmts,
GridQueryCancel cancel
) {
try {
List<FieldsQueryCursor<List<?>>> res = new ArrayList<>(1);
SqlFieldsQuery remainingQry = qry;
while (remainingQry != null) {
Span qrySpan = ctx.tracing().create(SQL_QRY, MTC.span())
.addTag(SQL_SCHEMA, () -> schemaName);
try (TraceSurroundings ignored = MTC.supportContinual(qrySpan)) {
// Parse.
QueryParserResult parseRes = parser.parse(schemaName, remainingQry, !failOnMultipleStmts);
qrySpan.addTag(SQL_QRY_TEXT, () -> parseRes.queryDescriptor().sql());
remainingQry = parseRes.remainingQuery();
// Get next command.
QueryDescriptor newQryDesc = parseRes.queryDescriptor();
QueryParameters newQryParams = parseRes.queryParameters();
// Check if there is enough parameters. Batched statements are not checked at this point
// since they pass parameters differently.
if (!newQryDesc.batched()) {
int qryParamsCnt = F.isEmpty(newQryParams.arguments()) ? 0 : newQryParams.arguments().length;
if (qryParamsCnt < parseRes.parametersCount())
throw new IgniteSQLException("Invalid number of query parameters [expected=" +
parseRes.parametersCount() + ", actual=" + qryParamsCnt + ']');
}
// Check if cluster state is valid.
checkClusterState(parseRes);
// Execute.
if (parseRes.isCommand()) {
QueryParserResultCommand cmd = parseRes.command();
assert cmd != null;
FieldsQueryCursor<List<?>> cmdRes = executeCommand(
newQryDesc,
newQryParams,
cliCtx,
cmd
);
res.add(cmdRes);
}
else if (parseRes.isDml()) {
QueryParserResultDml dml = parseRes.dml();
assert dml != null;
List<? extends FieldsQueryCursor<List<?>>> dmlRes = executeDml(
newQryDesc,
newQryParams,
dml,
cancel
);
res.addAll(dmlRes);
}
else {
assert parseRes.isSelect();
QueryParserResultSelect select = parseRes.select();
assert select != null;
List<? extends FieldsQueryCursor<List<?>>> qryRes = executeSelect(
newQryDesc,
newQryParams,
select,
keepBinary,
cancel
);
res.addAll(qryRes);
}
}
catch (Throwable th) {
qrySpan.addTag(ERROR, th::getMessage).end();
throw th;
}
}
return res;
}
catch (RuntimeException | Error e) {
GridNearTxLocal tx = ctx.cache().context().tm().tx();
if (tx != null && tx.mvccSnapshot() != null &&
(!(e instanceof IgniteSQLException) || /* Parsing errors should not rollback Tx. */
((IgniteSQLException)e).sqlState() != SqlStateCode.PARSING_EXCEPTION))
tx.setRollbackOnly();
throw e;
}
}
/**
* Execute an all-ready {@link SqlFieldsQuery}.
*
* @param qryDesc Plan key.
* @param qryParams Parameters.
* @param dml DML.
* @param cancel Query cancel state holder.
* @return Query result.
*/
private List<? extends FieldsQueryCursor<List<?>>> executeDml(
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultDml dml,
GridQueryCancel cancel
) {
IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null);
long qryId = registerRunningQuery(qryDesc, qryParams, cancel, dml.statement());
Exception failReason = null;
try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_DML_QRY_EXECUTE, MTC.span()))) {
if (!dml.mvccEnabled() && !updateInTxAllowed && ctx.cache().context().tm().inUserTx()) {
throw new IgniteSQLException("DML statements are not allowed inside a transaction over " +
"cache(s) with TRANSACTIONAL atomicity mode (change atomicity mode to " +
"TRANSACTIONAL_SNAPSHOT or disable this error message with system property " +
"\"-DIGNITE_ALLOW_DML_INSIDE_TRANSACTION=true\")");
}
if (!qryDesc.local()) {
return executeUpdateDistributed(
qryId,
qryDesc,
qryParams,
dml,
cancel
);
}
else {
UpdateResult updRes = executeUpdate(
qryId,
qryDesc,
qryParams,
dml,
true,
filter,
cancel
);
return singletonList(new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
return new IgniteSingletonIterator<>(singletonList(updRes.counter()));
}
}, cancel, true, false));
}
}
catch (IgniteException e) {
failReason = e;
throw e;
}
catch (IgniteCheckedException e) {
failReason = e;
IgniteClusterReadOnlyException roEx = X.cause(e, IgniteClusterReadOnlyException.class);
if (roEx != null) {
throw new IgniteSQLException(
"Failed to execute DML statement. Cluster in read-only mode [stmt=" + qryDesc.sql() +
", params=" + Arrays.deepToString(qryParams.arguments()) + "]",
IgniteQueryErrorCode.CLUSTER_READ_ONLY_MODE_ENABLED,
e
);
}
throw new IgniteSQLException("Failed to execute DML statement [stmt=" + qryDesc.sql() +
", params=" + Arrays.deepToString(qryParams.arguments()) + "]", e);
}
finally {
runningQueryManager().unregister(qryId, failReason);
}
}
/**
* Execute an all-ready {@link SqlFieldsQuery}.
*
* @param qryDesc Plan key.
* @param qryParams Parameters.
* @param select Select.
* @param keepBinary Whether binary objects must not be deserialized automatically.
* @param cancel Query cancel state holder.
* @return Query result.
*/
private List<? extends FieldsQueryCursor<List<?>>> executeSelect(
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultSelect select,
boolean keepBinary,
GridQueryCancel cancel
) {
assert cancel != null;
// Register query.
long qryId = registerRunningQuery(qryDesc, qryParams, cancel, select.statement());
try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_CURSOR_OPEN, MTC.span()))) {
GridNearTxLocal tx = null;
MvccQueryTracker tracker = null;
GridCacheContext mvccCctx = null;
boolean inTx = false;
if (select.mvccEnabled()) {
mvccCctx = ctx.cache().context().cacheContext(select.mvccCacheId());
if (mvccCctx == null)
throw new IgniteCheckedException("Cache has been stopped concurrently [cacheId=" +
select.mvccCacheId() + ']');
boolean autoStartTx = !qryParams.autoCommit() && tx(ctx) == null;
// Start new user tx in case of autocommit == false.
if (autoStartTx)
txStart(ctx, qryParams.timeout());
tx = tx(ctx);
checkActive(tx);
inTx = tx != null;
tracker = MvccUtils.mvccTracker(mvccCctx, tx);
}
int timeout = operationTimeout(qryParams.timeout(), tx);
Iterable<List<?>> iter = executeSelect0(
qryId,
qryDesc,
qryParams,
select,
keepBinary,
tracker,
cancel,
inTx,
timeout);
// Execute SELECT FOR UPDATE if needed.
if (select.forUpdate() && inTx)
iter = lockSelectedRows(iter, mvccCctx, timeout, qryParams.pageSize());
RegisteredQueryCursor<List<?>> cursor = new RegisteredQueryCursor<>(iter, cancel, runningQueryManager(),
qryParams.lazy(), qryId, ctx.tracing());
cancel.add(cursor::cancel);
cursor.fieldsMeta(select.meta());
cursor.partitionResult(select.twoStepQuery() != null ? select.twoStepQuery().derivedPartitions() : null);
return singletonList(cursor);
}
catch (Exception e) {
runningQueryManager().unregister(qryId, e);
if (e instanceof IgniteCheckedException)
throw U.convertException((IgniteCheckedException)e);
if (e instanceof RuntimeException)
throw (RuntimeException)e;
throw new IgniteSQLException("Failed to execute SELECT statement: " + qryDesc.sql(), e);
}
}
/**
* Execute SELECT statement for DML.
*
* @param qryId Query id.
* @param schema Schema.
* @param selectQry Select query.
* @param mvccTracker MVCC tracker.
* @param cancel Cancel.
* @param timeout Timeout.
* @return Fields query.
*/
private QueryCursorImpl<List<?>> executeSelectForDml(
long qryId,
String schema,
SqlFieldsQuery selectQry,
MvccQueryTracker mvccTracker,
GridQueryCancel cancel,
int timeout
) {
QueryParserResult parseRes = parser.parse(schema, selectQry, false);
QueryParserResultSelect select = parseRes.select();
assert select != null;
Iterable<List<?>> iter = executeSelect0(
qryId,
parseRes.queryDescriptor(),
parseRes.queryParameters(),
select,
true,
mvccTracker,
cancel,
false,
timeout
);
QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(iter, cancel, true, parseRes.queryParameters().lazy());
cursor.fieldsMeta(select.meta());
cursor.partitionResult(select.twoStepQuery() != null ? select.twoStepQuery().derivedPartitions() : null);
return cursor;
}
/**
* Execute an all-ready {@link SqlFieldsQuery}.
*
* @param qryId Query id.
* @param qryDesc Plan key.
* @param qryParams Parameters.
* @param select Select.
* @param keepBinary Whether binary objects must not be deserialized automatically.
* @param mvccTracker MVCC tracker.
* @param cancel Query cancel state holder.
* @param inTx Flag whether query is executed within transaction.
* @param timeout Timeout.
* @return Query result.
*/
private Iterable<List<?>> executeSelect0(
long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultSelect select,
boolean keepBinary,
MvccQueryTracker mvccTracker,
GridQueryCancel cancel,
boolean inTx,
int timeout
) {
assert !select.mvccEnabled() || mvccTracker != null;
// Check security.
if (ctx.security().enabled())
checkSecurity(select.cacheIds());
Iterable<List<?>> iter;
if (select.splitNeeded()) {
// Distributed query.
GridCacheTwoStepQuery twoStepQry = select.forUpdate() && inTx ?
select.forUpdateTwoStepQuery() : select.twoStepQuery();
assert twoStepQry != null;
iter = executeSelectDistributed(
qryId,
qryDesc,
qryParams,
twoStepQry,
keepBinary,
mvccTracker,
cancel,
timeout
);
}
else {
// Local query.
IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null);
GridQueryFieldsResult res = executeSelectLocal(
qryId,
qryDesc,
qryParams,
select,
filter,
mvccTracker,
cancel,
inTx,
timeout
);
iter = () -> {
try {
return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary);
}
catch (IgniteCheckedException | IgniteSQLException e) {
throw new CacheException(e);
}
};
}
return iter;
}
/**
* Locks rows from query cursor and returns the select result.
*
* @param cur Query cursor.
* @param cctx Cache context.
* @param pageSize Page size.
* @param timeout Timeout.
* @return Query results cursor.
*/
private Iterable<List<?>> lockSelectedRows(Iterable<List<?>> cur, GridCacheContext cctx, int pageSize, long timeout) {
assert cctx != null && cctx.mvccEnabled();
GridNearTxLocal tx = tx(ctx);
if (tx == null)
throw new IgniteSQLException("Failed to perform SELECT FOR UPDATE operation: transaction has already finished.");
Collection<List<?>> rowsCache = new ArrayList<>();
UpdateSourceIterator srcIt = new UpdateSourceIterator<KeyCacheObject>() {
private Iterator<List<?>> it = cur.iterator();
@Override public EnlistOperation operation() {
return EnlistOperation.LOCK;
}
@Override public boolean hasNextX() throws IgniteCheckedException {
return it.hasNext();
}
@Override public KeyCacheObject nextX() throws IgniteCheckedException {
List<?> res = it.next();
// nextX() can be called from the different threads.
synchronized (rowsCache) {
rowsCache.add(res.subList(0, res.size() - 1));
if (rowsCache.size() > TX_SIZE_THRESHOLD) {
throw new IgniteCheckedException("Too many rows are locked by SELECT FOR UPDATE statement. " +
"Consider locking fewer keys or increase the limit by setting a " +
IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD + " system property. Current value is " +
TX_SIZE_THRESHOLD + " rows.");
}
}
// The last column is expected to be a _key.
return cctx.toCacheKeyObject(res.get(res.size() - 1));
}
};
IgniteInternalFuture<Long> fut = tx.updateAsync(cctx, srcIt, pageSize,
timeout, true);
try {
fut.get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
return rowsCache;
}
/**
* Register running query.
*
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param cancel Query cancel state holder.
* @param stmnt Parsed statement.
* @return Id of registered query or {@code null} if query wasn't registered.
*/
private long registerRunningQuery(
QueryDescriptor qryDesc,
QueryParameters qryParams,
GridQueryCancel cancel,
@Nullable GridSqlStatement stmnt
) {
String qry = QueryUtils.INCLUDE_SENSITIVE || stmnt == null ? qryDesc.sql() : sqlWithoutConst(stmnt);
long res = runningQueryManager().register(
qry,
GridCacheQueryType.SQL_FIELDS,
qryDesc.schemaName(),
qryDesc.local(),
cancel,
qryDesc.queryInitiatorId()
);
if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) {
ctx.event().record(new SqlQueryExecutionEvent(
ctx.discovery().localNode(),
GridCacheQueryType.SQL_FIELDS.name() + " query execution.",
qry,
qryParams.arguments(),
ctx.security().enabled() ? ctx.security().securityContext().subject().id() : null));
}
return res;
}
/**
* @param stmnt Statement to print.
* @return SQL query where constant replaced with '?' char.
* @see GridSqlConst#getSQL()
* @see QueryUtils#includeSensitive()
*/
private String sqlWithoutConst(GridSqlStatement stmnt) {
QueryUtils.INCLUDE_SENSITIVE_TL.set(false);
try {
return stmnt.getSQL();
}
finally {
QueryUtils.INCLUDE_SENSITIVE_TL.set(true);
}
}
/**
* Check security access for caches.
*
* @param cacheIds Cache IDs.
*/
private void checkSecurity(Collection<Integer> cacheIds) {
if (F.isEmpty(cacheIds))
return;
for (Integer cacheId : cacheIds) {
DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheId);
if (desc != null)
ctx.security().authorize(desc.cacheName(), SecurityPermission.CACHE_READ);
}
}
/** {@inheritDoc} */
@Override public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(
GridCacheContext<?, ?> cctx,
int[] ids,
int[] parts,
String schema,
String qry,
Object[] params,
int flags,
int pageSize,
int timeout,
AffinityTopologyVersion topVer,
MvccSnapshot mvccSnapshot,
GridQueryCancel cancel
) {
SqlFieldsQuery fldsQry = QueryUtils.withQueryTimeout(new SqlFieldsQuery(qry), timeout, TimeUnit.MILLISECONDS);
if (params != null)
fldsQry.setArgs(params);
fldsQry.setEnforceJoinOrder(U.isFlagSet(flags, GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER));
fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
fldsQry.setPageSize(pageSize);
fldsQry.setLocal(true);
fldsQry.setLazy(U.isFlagSet(flags, GridH2QueryRequest.FLAG_LAZY));
boolean loc = true;
final boolean replicated = U.isFlagSet(flags, GridH2QueryRequest.FLAG_REPLICATED);
GridCacheContext<?, ?> cctx0;
if (!replicated
&& !F.isEmpty(ids)
&& (cctx0 = CU.firstPartitioned(cctx.shared(), ids)) != null
&& cctx0.config().getQueryParallelism() > 1) {
fldsQry.setDistributedJoins(true);
loc = false;
}
QueryParserResult parseRes = parser.parse(schema, fldsQry, false);
assert parseRes.remainingQuery() == null;
QueryParserResultDml dml = parseRes.dml();
assert dml != null;
IndexingQueryFilter filter = backupFilter(topVer, parts);
UpdatePlan plan = dml.plan();
GridCacheContext planCctx = plan.cacheContext();
// Force keepBinary for operation context to avoid binary deserialization inside entry processor
DmlUtils.setKeepBinaryContext(planCctx);
SqlFieldsQuery selectFieldsQry = QueryUtils.withQueryTimeout(
new SqlFieldsQuery(plan.selectQuery(), fldsQry.isCollocated()),
fldsQry.getTimeout(),
TimeUnit.MILLISECONDS
)
.setArgs(fldsQry.getArgs())
.setDistributedJoins(fldsQry.isDistributedJoins())
.setEnforceJoinOrder(fldsQry.isEnforceJoinOrder())
.setLocal(fldsQry.isLocal())
.setPageSize(fldsQry.getPageSize())
.setTimeout(fldsQry.getTimeout(), TimeUnit.MILLISECONDS)
.setLazy(fldsQry.isLazy());
QueryCursorImpl<List<?>> cur;
// Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
// sub-query and not some dummy stuff like "select 1, 2, 3;"
if (!loc && !plan.isLocalSubquery()) {
cur = executeSelectForDml(
RunningQueryManager.UNDEFINED_QUERY_ID,
schema,
selectFieldsQry,
new StaticMvccQueryTracker(planCctx, mvccSnapshot),
cancel,
timeout
);
}
else {
selectFieldsQry.setLocal(true);
QueryParserResult selectParseRes = parser.parse(schema, selectFieldsQry, false);
GridQueryFieldsResult res = executeSelectLocal(
RunningQueryManager.UNDEFINED_QUERY_ID,
selectParseRes.queryDescriptor(),
selectParseRes.queryParameters(),
selectParseRes.select(),
filter,
new StaticMvccQueryTracker(planCctx, mvccSnapshot),
cancel,
true,
timeout
);
cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
try {
return res.iterator();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
}, cancel, true, selectParseRes.queryParameters().lazy());
}
return plan.iteratorForTransaction(connMgr, cur);
}
/**
* Run distributed query on detected set of partitions.
*
* @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param twoStepQry Two-step query.
* @param keepBinary Keep binary flag.
* @param mvccTracker Query tracker.
* @param cancel Cancel handler.
* @param timeout Timeout.
* @return Cursor representing distributed query result.
*/
@SuppressWarnings("IfMayBeConditional")
private Iterable<List<?>> executeSelectDistributed(
final long qryId,
final QueryDescriptor qryDesc,
final QueryParameters qryParams,
final GridCacheTwoStepQuery twoStepQry,
final boolean keepBinary,
MvccQueryTracker mvccTracker,
final GridQueryCancel cancel,
int timeout
) {
// When explicit partitions are set, there must be an owning cache they should be applied to.
PartitionResult derivedParts = twoStepQry.derivedPartitions();
final int[] parts = PartitionResult.calculatePartitions(
qryParams.partitions(),
derivedParts,
qryParams.arguments()
);
Iterable<List<?>> iter;
if (parts != null && parts.length == 0) {
iter = new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
return new Iterator<List<?>>() {
@Override public boolean hasNext() {
return false;
}
@SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException")
@Override public List<?> next() {
return null;
}
};
}
};
}
else {
assert !twoStepQry.mvccEnabled() || !F.isEmpty(twoStepQry.cacheIds());
assert twoStepQry.mvccEnabled() == (mvccTracker != null);
iter = new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) {
return IgniteH2Indexing.this.rdcQryExec.query(
qryId,
qryDesc.schemaName(),
twoStepQry,
keepBinary,
qryDesc.enforceJoinOrder(),
timeout,
cancel,
qryParams.arguments(),
parts,
qryParams.lazy(),
mvccTracker,
qryParams.dataPageScanEnabled(),
qryParams.pageSize()
);
}
catch (Throwable e) {
if (mvccTracker != null)
mvccTracker.onDone();
throw e;
}
}
};
}
return iter;
}
/**
* Executes DML request on map node. Happens only for "skip reducer" mode.
*
* @param schemaName Schema name.
* @param qry Query.
* @param filter Filter.
* @param cancel Cancel state.
* @param loc Locality flag.
* @return Update result.
* @throws IgniteCheckedException if failed.
*/
public UpdateResult executeUpdateOnDataNode(
String schemaName,
SqlFieldsQuery qry,
IndexingQueryFilter filter,
GridQueryCancel cancel,
boolean loc
) throws IgniteCheckedException {
QueryParserResult parseRes = parser.parse(schemaName, qry, false);
assert parseRes.remainingQuery() == null;
QueryParserResultDml dml = parseRes.dml();
assert dml != null;
return executeUpdate(
RunningQueryManager.UNDEFINED_QUERY_ID,
parseRes.queryDescriptor(),
parseRes.queryParameters(),
dml,
loc,
filter,
cancel
);
}
/**
* Registers new class description.
*
* This implementation doesn't support type reregistration.
*
* @param cacheInfo Cache context info.
* @param type Type description.
* @param isSql {@code true} in case table has been created from SQL.
* @throws IgniteCheckedException In case of error.
*/
@Override public boolean registerType(GridCacheContextInfo cacheInfo, GridQueryTypeDescriptor type, boolean isSql)
throws IgniteCheckedException {
validateTypeDescriptor(type);
schemaMgr.onCacheTypeCreated(cacheInfo, this, type, isSql);
return true;
}
/** {@inheritDoc} */
@Override public GridCacheContextInfo registeredCacheInfo(String cacheName) {
for (H2TableDescriptor tbl : schemaMgr.tablesForCache(cacheName)) {
if (F.eq(tbl.cacheName(), cacheName))
return tbl.cacheInfo();
}
return null;
}
/** {@inheritDoc} */
@Override public void closeCacheOnClient(String cacheName) {
GridCacheContextInfo cacheInfo = registeredCacheInfo(cacheName);
// Only for SQL caches.
if (cacheInfo != null) {
parser.clearCache();
cacheInfo.clearCacheContext();
}
}
/** {@inheritDoc} */
@Override public String schema(String cacheName) {
return schemaMgr.schemaName(cacheName);
}
/** {@inheritDoc} */
@Override public Set<String> schemasNames() {
return schemaMgr.schemaNames();
}
/** {@inheritDoc} */
@Override public Collection<TableInformation> tablesInformation(String schemaNamePtrn, String tblNamePtrn,
String... tblTypes) {
Set<String> types = F.isEmpty(tblTypes) ? Collections.emptySet() : new HashSet<>(Arrays.asList(tblTypes));
Collection<TableInformation> infos = new ArrayList<>();
boolean allTypes = F.isEmpty(tblTypes);
if (allTypes || types.contains(TableType.TABLE.name())) {
schemaMgr.dataTables().stream()
.filter(t -> matches(t.getSchema().getName(), schemaNamePtrn))
.filter(t -> matches(t.getName(), tblNamePtrn))
.map(t -> {
int cacheGrpId = t.cacheInfo().groupId();
CacheGroupDescriptor cacheGrpDesc = ctx.cache().cacheGroupDescriptors().get(cacheGrpId);
// We should skip table in case regarding cache group has been removed.
if (cacheGrpDesc == null)
return null;
GridQueryTypeDescriptor type = t.rowDescriptor().type();
IndexColumn affCol = t.getExplicitAffinityKeyColumn();
String affinityKeyCol = affCol != null ? affCol.columnName : null;
return new TableInformation(t.getSchema().getName(), t.getName(), TableType.TABLE.name(), cacheGrpId,
cacheGrpDesc.cacheOrGroupName(), t.cacheId(), t.cacheName(), affinityKeyCol,
type.keyFieldAlias(), type.valueFieldAlias(), type.keyTypeName(), type.valueTypeName());
})
.filter(Objects::nonNull)
.forEach(infos::add);
}
if ((allTypes || types.contains(TableType.VIEW.name()))
&& matches(QueryUtils.SCHEMA_SYS, schemaNamePtrn)) {
schemaMgr.systemViews().stream()
.filter(t -> matches(t.getTableName(), tblNamePtrn))
.map(v -> new TableInformation(QueryUtils.SCHEMA_SYS, v.getTableName(), TableType.VIEW.name()))
.forEach(infos::add);
}
return infos;
}
/** {@inheritDoc} */
@Override public Collection<ColumnInformation> columnsInformation(String schemaNamePtrn, String tblNamePtrn,
String colNamePtrn) {
Collection<ColumnInformation> infos = new ArrayList<>();
// Gather information about tables.
schemaMgr.dataTables().stream()
.filter(t -> matches(t.getSchema().getName(), schemaNamePtrn))
.filter(t -> matches(t.getName(), tblNamePtrn))
.flatMap(
tbl -> {
IndexColumn affCol = tbl.getAffinityKeyColumn();
return Stream.of(tbl.getColumns())
.filter(Column::getVisible)
.filter(c -> matches(c.getName(), colNamePtrn))
.map(c -> {
GridQueryProperty prop = tbl.rowDescriptor().type().property(c.getName());
boolean isAff = affCol != null && c.getColumnId() == affCol.column.getColumnId();
return new ColumnInformation(
c.getColumnId() - QueryUtils.DEFAULT_COLUMNS_COUNT + 1,
tbl.getSchema().getName(),
tbl.getName(),
c.getName(),
prop.type(),
c.isNullable(),
prop.defaultValue(),
prop.precision(),
prop.scale(),
isAff);
});
}
).forEach(infos::add);
// Gather information about system views.
if (matches(QueryUtils.SCHEMA_SYS, schemaNamePtrn)) {
schemaMgr.systemViews().stream()
.filter(v -> matches(v.getTableName(), tblNamePtrn))
.flatMap(
view ->
Stream.of(view.getColumns())
.filter(c -> matches(c.getName(), colNamePtrn))
.map(c -> new ColumnInformation(
c.getColumnId() + 1,
QueryUtils.SCHEMA_SYS,
view.getTableName(),
c.getName(),
IgniteUtils.classForName(DataType.getTypeClassName(c.getType()), Object.class),
c.isNullable(),
null,
(int)c.getPrecision(),
c.getScale(),
false))
).forEach(infos::add);
}
return infos;
}
/** {@inheritDoc} */
@Override public boolean isConvertibleToColumnType(String schemaName, String tblName, String colName, Class<?> cls) {
GridH2Table table = schemaMgr.dataTable(schemaName, tblName);
if (table == null)
throw new IgniteSQLException("Table was not found [schemaName=" + schemaName + ", tableName=" + tblName + ']');
try {
return H2Utils.isConvertableToColumnType(cls, table.getColumn(colName).getType());
}
catch (DbException e) {
throw new IgniteSQLException("Colum with specified name was not found for the table [schemaName=" + schemaName +
", tableName=" + tblName + ", colName=" + colName + ']', e);
}
}
/** {@inheritDoc} */
@Override public boolean isStreamableInsertStatement(String schemaName, SqlFieldsQuery qry) throws SQLException {
QueryParserResult parsed = parser.parse(schemaName, qry, true);
return parsed.isDml() && parsed.dml().streamable() && parsed.remainingQuery() == null;
}
/** {@inheritDoc} */
@Override public void markAsRebuildNeeded(GridCacheContext cctx, boolean val) {
schemaMgr.markIndexRebuild(cctx.name(), val);
}
/**
* @return Busy lock.
*/
public GridSpinBusyLock busyLock() {
return busyLock;
}
/**
* @return Map query executor.
*/
public GridMapQueryExecutor mapQueryExecutor() {
return mapQryExec;
}
/**
* @return Reduce query executor.
*/
public GridReduceQueryExecutor reduceQueryExecutor() {
return rdcQryExec;
}
/** {@inheritDoc} */
@Override public RunningQueryManager runningQueryManager() {
return ctx.query().runningQueryManager();
}
/** {@inheritDoc} */
@SuppressWarnings({"deprecation", "AssignmentToStaticFieldFromInstanceMethod"})
@Override public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Starting cache query index...");
this.busyLock = busyLock;
if (SysProperties.serializeJavaObject) {
U.warn(log, "Serialization of Java objects in H2 was enabled.");
SysProperties.serializeJavaObject = false;
}
this.ctx = ctx;
partReservationMgr = new PartitionReservationManager(ctx);
connMgr = new ConnectionManager(ctx);
longRunningQryMgr = new LongRunningQueryManager(ctx);
parser = new QueryParser(this, connMgr, cmd -> cmdProc.isCommandSupported(cmd));
schemaMgr = new SchemaManager(ctx, connMgr);
schemaMgr.start(ctx.config().getSqlConfiguration().getSqlSchemas());
statsMgr = new IgniteStatisticsManagerImpl(ctx, schemaMgr);
nodeId = ctx.localNodeId();
marshaller = ctx.config().getMarshaller();
mapQryExec = new GridMapQueryExecutor();
rdcQryExec = new GridReduceQueryExecutor();
mapQryExec.start(ctx, this);
rdcQryExec.start(ctx, this);
discoLsnr = evt -> {
mapQryExec.onNodeLeft((DiscoveryEvent)evt);
rdcQryExec.onNodeLeft((DiscoveryEvent)evt);
};
ctx.event().addLocalEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
qryLsnr = (nodeId, msg, plc) -> onMessage(nodeId, msg);
ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, qryLsnr);
partExtractor = new PartitionExtractor(new H2PartitionResolver(this), ctx);
cmdProc = new CommandProcessor(ctx, schemaMgr, this);
if (JdbcUtils.serializer != null)
U.warn(log, "Custom H2 serialization is already configured, will override.");
JdbcUtils.serializer = h2Serializer();
distrCfg = new DistributedSqlConfiguration(ctx, log);
funcMgr = new FunctionsManager(distrCfg);
ctx.maintenanceRegistry()
.registerWorkflowCallbackIfTaskExists(
INDEX_REBUILD_MNTC_TASK_NAME,
task -> new RebuildIndexWorkflowCallback(
parseMaintenanceTaskParameters(task.parameters()),
this,
log
)
);
}
/**
* @param nodeId Node ID.
* @param msg Message.
*/
public void onMessage(UUID nodeId, Object msg) {
assert msg != null;
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null)
return; // Node left, ignore.
if (!busyLock.enterBusy())
return;
try {
if (msg instanceof GridCacheQueryMarshallable)
((GridCacheQueryMarshallable)msg).unmarshall(ctx.config().getMarshaller(), ctx);
try {
boolean processed = true;
boolean tracebleMsg = false;
if (msg instanceof GridQueryNextPageRequest) {
mapQueryExecutor().onNextPageRequest(node, (GridQueryNextPageRequest)msg);
tracebleMsg = true;
}
else if (msg instanceof GridQueryNextPageResponse) {
reduceQueryExecutor().onNextPage(node, (GridQueryNextPageResponse)msg);
tracebleMsg = true;
}
else if (msg instanceof GridH2QueryRequest)
mapQueryExecutor().onQueryRequest(node, (GridH2QueryRequest)msg);
else if (msg instanceof GridH2DmlRequest)
mapQueryExecutor().onDmlRequest(node, (GridH2DmlRequest)msg);
else if (msg instanceof GridH2DmlResponse)
reduceQueryExecutor().onDmlResponse(node, (GridH2DmlResponse)msg);
else if (msg instanceof GridQueryFailResponse)
reduceQueryExecutor().onFail(node, (GridQueryFailResponse)msg);
else if (msg instanceof GridQueryCancelRequest)
mapQueryExecutor().onCancel(node, (GridQueryCancelRequest)msg);
else
processed = false;
if (processed && log.isDebugEnabled() && (!tracebleMsg || log.isTraceEnabled()))
log.debug("Processed message: [srcNodeId=" + nodeId + ", msg=" + msg + ']');
}
catch (Throwable th) {
U.error(log, "Failed to process message: [srcNodeId=" + nodeId + ", msg=" + msg + ']', th);
}
}
finally {
busyLock.leaveBusy();
}
}
/**
* @return Value object context.
*/
public CacheObjectValueContext objectContext() {
return ctx.query().objectContext();
}
/**
* @param topic Topic.
* @param topicOrd Topic ordinal for {@link GridTopic}.
* @param nodes Nodes.
* @param msg Message.
* @param specialize Optional closure to specialize message for each node.
* @param locNodeHnd Handler for local node.
* @param plc Policy identifying the executor service which will process message.
* @param runLocParallel Run local handler in parallel thread.
* @return {@code true} If all messages sent successfully.
*/
public boolean send(
Object topic,
int topicOrd,
Collection<ClusterNode> nodes,
Message msg,
@Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize,
@Nullable final IgniteInClosure2X<ClusterNode, Message> locNodeHnd,
byte plc,
boolean runLocParallel
) {
boolean ok = true;
if (specialize == null && msg instanceof GridCacheQueryMarshallable)
((GridCacheQueryMarshallable)msg).marshall(marshaller);
ClusterNode locNode = null;
for (ClusterNode node : nodes) {
if (node.isLocal()) {
if (locNode != null)
throw new IllegalStateException();
locNode = node;
continue;
}
try {
if (specialize != null) {
msg = specialize.apply(node, msg);
if (msg instanceof GridCacheQueryMarshallable)
((GridCacheQueryMarshallable)msg).marshall(marshaller);
}
ctx.io().sendGeneric(node, topic, topicOrd, msg, plc);
}
catch (IgniteCheckedException e) {
ok = false;
U.warn(log, "Failed to send message [node=" + node + ", msg=" + msg +
", errMsg=" + e.getMessage() + "]");
}
}
// Local node goes the last to allow parallel execution.
if (locNode != null) {
assert locNodeHnd != null;
if (specialize != null) {
msg = specialize.apply(locNode, msg);
if (msg instanceof GridCacheQueryMarshallable)
((GridCacheQueryMarshallable)msg).marshall(marshaller);
}
if (runLocParallel) {
final ClusterNode finalLocNode = locNode;
final Message finalMsg = msg;
try {
// We prefer runLocal to runLocalSafe, because the latter can produce deadlock here.
ctx.closure().runLocal(new GridPlainRunnable() {
@Override public void run() {
if (!busyLock.enterBusy())
return;
try {
locNodeHnd.apply(finalLocNode, finalMsg);
}
finally {
busyLock.leaveBusy();
}
}
}, plc).listen(logger);
}
catch (IgniteCheckedException e) {
ok = false;
U.error(log, "Failed to execute query locally.", e);
}
}
else
locNodeHnd.apply(locNode, msg);
}
return ok;
}
/**
* @return Serializer.
*/
private JavaObjectSerializer h2Serializer() {
return new H2JavaObjectSerializer();
}
/** {@inheritDoc} */
@Override public void stop() {
if (log.isDebugEnabled())
log.debug("Stopping cache query index...");
mapQryExec.stop();
qryCtxRegistry.clearSharedOnLocalNodeStop();
schemaMgr.stop();
longRunningQryMgr.stop();
connMgr.stop();
statsMgr.stop();
if (log.isDebugEnabled())
log.debug("Cache query index stopped.");
}
/** {@inheritDoc} */
@Override public void onClientDisconnect() throws IgniteCheckedException {
if (!mvccEnabled(ctx))
return;
GridNearTxLocal tx = tx(ctx);
if (tx != null)
cmdProc.doRollback(tx);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public boolean initCacheContext(GridCacheContext cacheCtx) {
GridCacheContextInfo cacheInfo = registeredCacheInfo(cacheCtx.name());
if (cacheInfo != null) {
assert !cacheInfo.isCacheContextInited() : cacheInfo.name();
assert cacheInfo.name().equals(cacheCtx.name()) : cacheInfo.name() + " != " + cacheCtx.name();
cacheInfo.initCacheContext(cacheCtx);
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public void registerCache(String cacheName, String schemaName, GridCacheContextInfo<?, ?> cacheInfo)
throws IgniteCheckedException {
ctx.indexProcessor().idxRowCacheRegistry().onCacheRegistered(cacheInfo);
schemaMgr.onCacheCreated(cacheName, schemaName, cacheInfo.config().getSqlFunctionClasses());
}
/** {@inheritDoc} */
@Override public void unregisterCache(GridCacheContextInfo cacheInfo, boolean rmvIdx, boolean clearIdx) {
ctx.indexProcessor().unregisterCache(cacheInfo);
String cacheName = cacheInfo.name();
partReservationMgr.onCacheStop(cacheName);
// Drop schema (needs to be called after callback to DML processor because the latter depends on schema).
schemaMgr.onCacheDestroyed(cacheName, rmvIdx, clearIdx);
// Unregister connection.
connMgr.onCacheDestroyed();
// Clear query cache.
clearPlanCache();
}
/**
* Remove all cached queries from cached two-steps queries.
*/
private void clearPlanCache() {
parser.clearCache();
}
/** {@inheritDoc} */
@Override public IndexingQueryFilter backupFilter(@Nullable final AffinityTopologyVersion topVer,
@Nullable final int[] parts) {
return backupFilter(topVer, parts, false);
}
/**
* Returns backup filter.
*
* @param topVer Topology version.
* @param parts Partitions.
* @param treatReplicatedAsPartitioned true if need to treat replicated as partitioned (for outer joins).
* @return Backup filter.
*/
public IndexingQueryFilter backupFilter(@Nullable final AffinityTopologyVersion topVer, @Nullable final int[] parts,
boolean treatReplicatedAsPartitioned) {
return new IndexingQueryFilterImpl(ctx, topVer, parts, treatReplicatedAsPartitioned);
}
/**
* @return Ready topology version.
*/
public AffinityTopologyVersion readyTopologyVersion() {
return ctx.cache().context().exchange().readyAffinityVersion();
}
/**
* @param readyVer Ready topology version.
*
* @return {@code true} If pending distributed exchange exists because server topology is changed.
*/
public boolean serverTopologyChanged(AffinityTopologyVersion readyVer) {
GridDhtPartitionsExchangeFuture fut = ctx.cache().context().exchange().lastTopologyFuture();
if (fut.isDone())
return false;
AffinityTopologyVersion initVer = fut.initialVersion();
return initVer.compareTo(readyVer) > 0 && !fut.firstEvent().node().isClient();
}
/**
* @param topVer Topology version.
* @throws IgniteCheckedException If failed.
*/
public void awaitForReadyTopologyVersion(AffinityTopologyVersion topVer) throws IgniteCheckedException {
ctx.cache().context().exchange().affinityReadyFuture(topVer).get();
}
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
rdcQryExec.onDisconnected(reconnectFut);
}
/** {@inheritDoc} */
@Override public void onKernalStop() {
connMgr.onKernalStop();
ctx.io().removeMessageListener(GridTopic.TOPIC_QUERY, qryLsnr);
ctx.event().removeLocalEventListener(discoLsnr);
}
/**
* @return Query context registry.
*/
public QueryContextRegistry queryContextRegistry() {
return qryCtxRegistry;
}
/**
* @return Connection manager.
*/
public ConnectionManager connections() {
return connMgr;
}
/**
* @return Parser.
*/
public QueryParser parser() {
return parser;
}
/**
* @return Schema manager.
*/
public SchemaManager schemaManager() {
return schemaMgr;
}
/**
* @return Partition extractor.
*/
public PartitionExtractor partitionExtractor() {
return partExtractor;
}
/**
* @return Partition reservation manager.
*/
public PartitionReservationManager partitionReservationManager() {
return partReservationMgr;
}
/**
* @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param dml DML statement.
* @param cancel Query cancel.
* @return Update result wrapped into {@link GridQueryFieldsResult}
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("unchecked")
private List<QueryCursorImpl<List<?>>> executeUpdateDistributed(
long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultDml dml,
GridQueryCancel cancel
) throws IgniteCheckedException {
if (qryDesc.batched()) {
Collection<UpdateResult> ress;
List<Object[]> argss = qryParams.batchedArguments();
UpdatePlan plan = dml.plan();
GridCacheContext<?, ?> cctx = plan.cacheContext();
// For MVCC case, let's enlist batch elements one by one.
if (plan.hasRows() && plan.mode() == UpdateMode.INSERT && !cctx.mvccEnabled()) {
CacheOperationContext opCtx = DmlUtils.setKeepBinaryContext(cctx);
try {
List<List<List<?>>> cur = plan.createRows(argss);
//TODO: IGNITE-11176 - Need to support cancellation
ress = DmlUtils.processSelectResultBatched(plan, cur, qryParams.updateBatchSize());
}
finally {
DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
}
}
else {
// Fallback to previous mode.
ress = new ArrayList<>(argss.size());
SQLException batchException = null;
int[] cntPerRow = new int[argss.size()];
int cntr = 0;
for (Object[] args : argss) {
UpdateResult res;
try {
res = executeUpdate(
qryId,
qryDesc,
qryParams.toSingleBatchedArguments(args),
dml,
false,
null,
cancel
);
cntPerRow[cntr++] = (int)res.counter();
ress.add(res);
}
catch (Exception e ) {
SQLException sqlEx = QueryUtils.toSqlException(e);
batchException = DmlUtils.chainException(batchException, sqlEx);
cntPerRow[cntr++] = Statement.EXECUTE_FAILED;
}
}
if (batchException != null) {
BatchUpdateException e = new BatchUpdateException(batchException.getMessage(),
batchException.getSQLState(), batchException.getErrorCode(), cntPerRow, batchException);
throw new IgniteCheckedException(e);
}
}
ArrayList<QueryCursorImpl<List<?>>> resCurs = new ArrayList<>(ress.size());
for (UpdateResult res : ress) {
res.throwIfError();
QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(singletonList(
singletonList(res.counter())), cancel, false, false);
resCur.fieldsMeta(UPDATE_RESULT_META);
resCurs.add(resCur);
}
return resCurs;
}
else {
UpdateResult res = executeUpdate(
qryId,
qryDesc,
qryParams,
dml,
false,
null,
cancel
);
res.throwIfError();
QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(singletonList(
singletonList(res.counter())), cancel, false, false);
resCur.fieldsMeta(UPDATE_RESULT_META);
resCur.partitionResult(res.partitionResult());
return singletonList(resCur);
}
}
/**
* Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
*
* @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param dml DML command.
* @param loc Query locality flag.
* @param filters Cache name and key filter.
* @param cancel Cancel.
* @return Update result (modified items count and failed keys).
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("IfMayBeConditional")
private UpdateResult executeUpdate(
long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultDml dml,
boolean loc,
IndexingQueryFilter filters,
GridQueryCancel cancel
) throws IgniteCheckedException {
Object[] errKeys = null;
long items = 0;
PartitionResult partRes = null;
GridCacheContext<?, ?> cctx = dml.plan().cacheContext();
boolean transactional = cctx != null && cctx.mvccEnabled();
int maxRetryCnt = transactional ? 1 : DFLT_UPDATE_RERUN_ATTEMPTS;
for (int i = 0; i < maxRetryCnt; i++) {
CacheOperationContext opCtx = cctx != null ? DmlUtils.setKeepBinaryContext(cctx) : null;
UpdateResult r;
try {
if (transactional)
r = executeUpdateTransactional(
qryId,
qryDesc,
qryParams,
dml,
loc,
cancel
);
else
r = executeUpdateNonTransactional(
qryId,
qryDesc,
qryParams,
dml,
loc,
filters,
cancel
);
}
finally {
if (opCtx != null)
DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
}
items += r.counter();
errKeys = r.errorKeys();
partRes = r.partitionResult();
if (F.isEmpty(errKeys))
break;
}
if (F.isEmpty(errKeys) && partRes == null) {
if (items == 1L)
return UpdateResult.ONE;
else if (items == 0L)
return UpdateResult.ZERO;
}
return new UpdateResult(items, errKeys, partRes);
}
/**
* Execute update in non-transactional mode.
*
* @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param dml Plan.
* @param loc Local flag.
* @param filters Filters.
* @param cancel Cancel hook.
* @return Update result.
* @throws IgniteCheckedException If failed.
*/
private UpdateResult executeUpdateNonTransactional(
long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultDml dml,
boolean loc,
IndexingQueryFilter filters,
GridQueryCancel cancel
) throws IgniteCheckedException {
UpdatePlan plan = dml.plan();
UpdateResult fastUpdateRes = plan.processFast(qryParams.arguments());
if (fastUpdateRes != null)
return fastUpdateRes;
DmlDistributedPlanInfo distributedPlan = loc ? null : plan.distributedPlan();
if (distributedPlan != null) {
if (cancel == null)
cancel = new GridQueryCancel();
UpdateResult result = rdcQryExec.update(
qryDesc.schemaName(),
distributedPlan.getCacheIds(),
qryDesc.sql(),
qryParams.arguments(),
qryDesc.enforceJoinOrder(),
qryParams.pageSize(),
qryParams.timeout(),
qryParams.partitions(),
distributedPlan.isReplicatedOnly(),
cancel
);
// Null is returned in case not all nodes support distributed DML.
if (result != null)
return result;
}
final GridQueryCancel selectCancel = (cancel != null) ? new GridQueryCancel() : null;
if (cancel != null)
cancel.add(selectCancel::cancel);
SqlFieldsQuery selectFieldsQry = new SqlFieldsQuery(plan.selectQuery(), qryDesc.collocated())
.setArgs(qryParams.arguments())
.setDistributedJoins(qryDesc.distributedJoins())
.setEnforceJoinOrder(qryDesc.enforceJoinOrder())
.setLocal(qryDesc.local())
.setPageSize(qryParams.pageSize())
.setTimeout(qryParams.timeout(), TimeUnit.MILLISECONDS)
// On no MVCC mode we cannot use lazy mode when UPDATE query contains updated columns
// in WHERE condition because it may be cause of update one entry several times
// (when index for such columns is selected for scan):
// e.g. : UPDATE test SET val = val + 1 WHERE val >= ?
.setLazy(qryParams.lazy() && plan.canSelectBeLazy());
Iterable<List<?>> cur;
// Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
// sub-query and not some dummy stuff like "select 1, 2, 3;"
if (!loc && !plan.isLocalSubquery()) {
assert !F.isEmpty(plan.selectQuery());
cur = executeSelectForDml(
qryId,
qryDesc.schemaName(),
selectFieldsQry,
null,
selectCancel,
qryParams.timeout()
);
}
else if (plan.hasRows())
cur = plan.createRows(qryParams.arguments());
else {
selectFieldsQry.setLocal(true);
QueryParserResult selectParseRes = parser.parse(qryDesc.schemaName(), selectFieldsQry, false);
final GridQueryFieldsResult res = executeSelectLocal(
qryId,
selectParseRes.queryDescriptor(),
selectParseRes.queryParameters(),
selectParseRes.select(),
filters,
null,
selectCancel,
false,
qryParams.timeout()
);
cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
try {
return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), true);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
}, cancel, true, qryParams.lazy());
}
int pageSize = qryParams.updateBatchSize();
// TODO: IGNITE-11176 - Need to support cancellation
try {
return DmlUtils.processSelectResult(plan, cur, pageSize);
}
finally {
if (cur instanceof AutoCloseable)
U.closeQuiet((AutoCloseable)cur);
}
}
/**
* Execute update in transactional mode.
*
* @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param dml Plan.
* @param loc Local flag.
* @param cancel Cancel hook.
* @return Update result.
* @throws IgniteCheckedException If failed.
*/
private UpdateResult executeUpdateTransactional(
long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultDml dml,
boolean loc,
GridQueryCancel cancel
) throws IgniteCheckedException {
UpdatePlan plan = dml.plan();
GridCacheContext cctx = plan.cacheContext();
assert cctx != null;
assert cctx.transactional();
GridNearTxLocal tx = tx(ctx);
boolean implicit = (tx == null);
boolean commit = implicit && qryParams.autoCommit();
if (implicit)
tx = txStart(cctx, qryParams.timeout());
requestSnapshot(tx);
try (GridNearTxLocal toCommit = commit ? tx : null) {
DmlDistributedPlanInfo distributedPlan = loc ? null : plan.distributedPlan();
long timeout = implicit
? tx.remainingTime()
: operationTimeout(qryParams.timeout(), tx);
if (cctx.isReplicated() || distributedPlan == null || ((plan.mode() == UpdateMode.INSERT
|| plan.mode() == UpdateMode.MERGE) && !plan.isLocalSubquery())) {
boolean sequential = true;
UpdateSourceIterator<?> it;
if (plan.fastResult()) {
IgniteBiTuple row = plan.getFastRow(qryParams.arguments());
assert row != null;
EnlistOperation op = UpdatePlan.enlistOperation(plan.mode());
it = new DmlUpdateSingleEntryIterator<>(op, op.isDeleteOrLock() ? row.getKey() : row);
}
else if (plan.hasRows()) {
it = new DmlUpdateResultsIterator(
UpdatePlan.enlistOperation(plan.mode()),
plan,
plan.createRows(qryParams.arguments())
);
}
else {
SqlFieldsQuery selectFieldsQry = new SqlFieldsQuery(plan.selectQuery(), qryDesc.collocated())
.setArgs(qryParams.arguments())
.setDistributedJoins(qryDesc.distributedJoins())
.setEnforceJoinOrder(qryDesc.enforceJoinOrder())
.setLocal(qryDesc.local())
.setPageSize(qryParams.pageSize())
.setTimeout((int)timeout, TimeUnit.MILLISECONDS)
// In MVCC mode we can use lazy mode always (when is set up) without dependency on
// updated columns and WHERE condition.
.setLazy(qryParams.lazy());
FieldsQueryCursor<List<?>> cur = executeSelectForDml(
qryId,
qryDesc.schemaName(),
selectFieldsQry,
MvccUtils.mvccTracker(cctx, tx),
cancel,
(int)timeout
);
it = plan.iteratorForTransaction(connMgr, cur);
}
//TODO: IGNITE-11176 - Need to support cancellation
IgniteInternalFuture<Long> fut = tx.updateAsync(
cctx,
it,
qryParams.pageSize(),
timeout,
sequential
);
UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY,
plan.distributedPlan() != null ? plan.distributedPlan().derivedPartitions() : null);
if (commit)
toCommit.commit();
return res;
}
int[] ids = U.toIntArray(distributedPlan.getCacheIds());
int flags = 0;
if (qryDesc.enforceJoinOrder())
flags |= GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
if (distributedPlan.isReplicatedOnly())
flags |= GridH2QueryRequest.FLAG_REPLICATED;
if (qryParams.lazy())
flags |= GridH2QueryRequest.FLAG_LAZY;
flags = GridH2QueryRequest.setDataPageScanEnabled(flags,
qryParams.dataPageScanEnabled());
int[] parts = PartitionResult.calculatePartitions(
qryParams.partitions(),
distributedPlan.derivedPartitions(),
qryParams.arguments()
);
if (parts != null && parts.length == 0)
return new UpdateResult(0, X.EMPTY_OBJECT_ARRAY, distributedPlan.derivedPartitions());
else {
//TODO: IGNITE-11176 - Need to support cancellation
IgniteInternalFuture<Long> fut = tx.updateAsync(
cctx,
ids,
parts,
qryDesc.schemaName(),
qryDesc.sql(),
qryParams.arguments(),
flags,
qryParams.pageSize(),
timeout
);
UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY,
distributedPlan.derivedPartitions());
if (commit)
toCommit.commit();
return res;
}
}
catch (ClusterTopologyServerNotFoundException e) {
throw new CacheServerNotFoundException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
IgniteSQLException sqlEx = X.cause(e, IgniteSQLException.class);
if (sqlEx != null)
throw sqlEx;
Exception ex = IgniteUtils.convertExceptionNoWrap(e);
if (ex instanceof IgniteException)
throw (IgniteException)ex;
U.error(log, "Error during update [localNodeId=" + ctx.localNodeId() + "]", ex);
throw new IgniteSQLException("Failed to run update. " + ex.getMessage(), ex);
}
finally {
if (commit)
cctx.tm().resetContext();
}
}
/** {@inheritDoc} */
@Override public void registerMxBeans(IgniteMBeansManager mbMgr) throws IgniteCheckedException {
SqlQueryMXBean qryMXBean = new SqlQueryMXBeanImpl(ctx);
mbMgr.registerMBean("SQL Query", qryMXBean.getClass().getSimpleName(), qryMXBean, SqlQueryMXBean.class);
}
/**
* @return Long running queries manager.
*/
public LongRunningQueryManager longRunningQueries() {
return longRunningQryMgr;
}
/** {@inheritDoc} */
@Override public long indexSize(String schemaName, String tblName, String idxName) throws IgniteCheckedException {
GridH2Table tbl = schemaMgr.dataTable(schemaName, tblName);
if (tbl == null)
return 0;
H2TreeIndex idx = (H2TreeIndex)tbl.userIndex(idxName);
return idx == null ? 0 : idx.size();
}
/**
* @return Distributed SQL configuration.
*/
public DistributedSqlConfiguration distributedConfiguration() {
return distrCfg;
}
/** {@inheritDoc} */
@Override public Map<String, Integer> secondaryIndexesInlineSize() {
Map<String, Integer> map = new HashMap<>();
for (GridH2Table table : schemaMgr.dataTables()) {
for (Index index : table.getIndexes()) {
if (index instanceof H2TreeIndexBase && !index.getIndexType().isPrimaryKey()) {
map.put(
index.getSchema().getName() + "#" + index.getTable().getName() + "#" + index.getName(),
((H2TreeIndexBase)index).inlineSize()
);
}
}
}
return map;
}
/**
* @return Statistics manager.
*/
public IgniteStatisticsManager statsManager() {
return statsMgr;
}
}