blob: 47c4f59f672aa1765433fdb6da7deb306ce9b88e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2;
import java.sql.BatchUpdateException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcParameterMeta;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
import org.apache.ignite.internal.processors.query.GridQueryFinishedInfo;
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryStartedInfo;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.SqlClientContext;
import org.apache.ignite.internal.processors.query.h2.affinity.H2PartitionResolver;
import org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor;
import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker;
import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
import org.apache.ignite.internal.processors.query.schema.AbstractSchemaChangeListener;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.sql.SqlParseException;
import org.apache.ignite.internal.sql.command.SqlCommand;
import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.h2.api.ErrorCode;
import org.h2.api.JavaObjectSerializer;
import org.h2.engine.Session;
import org.h2.engine.SysProperties;
import org.h2.util.JdbcUtils;
import org.h2.value.CompareMode;
import org.jetbrains.annotations.Nullable;
import static java.util.Collections.singletonList;
import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
import static org.apache.ignite.internal.processors.query.h2.H2Utils.UPDATE_RESULT_META;
import static org.apache.ignite.internal.processors.query.h2.H2Utils.generateFieldsQueryString;
import static org.apache.ignite.internal.processors.query.h2.H2Utils.session;
import static org.apache.ignite.internal.processors.query.h2.H2Utils.zeroCursor;
import static org.apache.ignite.internal.processors.tracing.SpanTags.ERROR;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_QRY_TEXT;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_SCHEMA;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CMD_QRY_EXECUTE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CURSOR_OPEN;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_DML_QRY_EXECUTE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_ITER_OPEN;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY_EXECUTE;
/**
* Indexing implementation based on H2 database engine. In this implementation main query language is SQL,
* fulltext indexing can be performed using Lucene.
* <p>
* For each registered {@link GridQueryTypeDescriptor} this SPI will create respective SQL table with
* {@code '_key'} and {@code '_val'} fields for key and value, and fields from
* {@link GridQueryTypeDescriptor#fields()}.
* For each table it will create indexes declared in {@link GridQueryTypeDescriptor#indexes()}.
*/
public class IgniteH2Indexing implements GridQueryIndexing {
/** Default number of attempts to re-run DELETE and UPDATE queries in case of concurrent modifications of values. */
private static final int DFLT_UPDATE_RERUN_ATTEMPTS = 4;
/** Cached value of {@code IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION}. */
private final boolean updateInTxAllowed =
Boolean.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION);
static {
// Required to skip checks of forbidden H2 settings, otherwise Ignite fails to start.
//
// Note, H2 system properties must be overriden here, because the properties are finalized while the class
// org.h2.engine.SysProperties is loaded in the IgniteH2Indexing.start(...) method.
//
// @see ConnectionManager#forbidH2DbSettings(String...)
System.setProperty("h2.check", "false");
}
/** Logger. */
@LoggerResource
private IgniteLogger log;
/** Node ID. */
private UUID nodeId;
/** */
private Marshaller marshaller;
/** */
private GridMapQueryExecutor mapQryExec;
/** */
private GridReduceQueryExecutor rdcQryExec;
/** */
private GridSpinBusyLock busyLock;
/** */
protected volatile GridKernalContext ctx;
/** Query context registry. */
private final QueryContextRegistry qryCtxRegistry = new QueryContextRegistry();
/** Processor to execute commands which are neither SELECT, nor DML. */
private CommandProcessor cmdProc;
/** Partition reservation manager. */
private PartitionReservationManager partReservationMgr;
/** Partition extractor. */
private PartitionExtractor partExtractor;
/** Parser. */
private QueryParser parser;
/** */
private final IgniteInClosure<? super IgniteInternalFuture<?>> logger = new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
fut.get();
}
catch (IgniteCheckedException e) {
U.error(log, e.getMessage(), e);
}
}
};
/** Query executor. */
private ConnectionManager connMgr;
/** Schema manager. */
private H2SchemaManager schemaMgr;
/** Heavy queries tracker. */
private HeavyQueriesTracker heavyQryTracker;
/** Discovery event listener. */
private GridLocalEventListener discoLsnr;
/** Query message listener. */
private GridMessageListener qryLsnr;
/** Distributed config. */
private DistributedIndexingConfiguration distrCfg;
/** Functions manager. */
private FunctionsManager funcMgr;
/**
* @return Kernal context.
*/
public GridKernalContext kernalContext() {
return ctx;
}
/** {@inheritDoc} */
@Override public List<JdbcParameterMeta> parameterMetaData(String schemaName, SqlFieldsQuery qry)
throws IgniteSQLException {
assert qry != null;
ArrayList<JdbcParameterMeta> metas = new ArrayList<>();
SqlFieldsQuery curQry = qry;
while (curQry != null) {
QueryParserResult parsed = parser.parse(schemaName, curQry, true);
metas.addAll(parsed.parametersMeta());
curQry = parsed.remainingQuery();
}
return metas;
}
/** {@inheritDoc} */
@Override public List<GridQueryFieldMetadata> resultMetaData(String schemaName, SqlFieldsQuery qry)
throws IgniteSQLException {
QueryParserResult parsed = parser.parse(schemaName, qry, true);
if (parsed.remainingQuery() != null)
return null;
if (parsed.isSelect())
return parsed.select().meta();
return null;
}
/** {@inheritDoc} */
@Override public void store(GridCacheContext cctx,
GridQueryTypeDescriptor type,
CacheDataRow row,
@Nullable CacheDataRow prevRow,
boolean prevRowAvailable
) throws IgniteCheckedException {
GridH2Table tbl = schemaMgr.dataTable(type.schemaName(), type.tableName());
if (tbl == null)
return; // Type was rejected.
if (tbl.tableDescriptor().luceneIndex() != null) {
long expireTime = row.expireTime();
if (expireTime == 0L)
expireTime = Long.MAX_VALUE;
tbl.tableDescriptor().luceneIndex().store(row.key(), row.value(), row.version(), expireTime);
}
tbl.update(row, prevRow);
}
/** {@inheritDoc} */
@Override public void remove(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row)
throws IgniteCheckedException {
if (log.isDebugEnabled()) {
log.debug("Removing key from cache query index [locId=" + nodeId +
", key=" + row.key() +
", val=" + row.value() + ']');
}
GridH2Table tbl = schemaMgr.dataTable(type.schemaName(), type.tableName());
if (tbl == null)
return;
if (tbl.tableDescriptor().luceneIndex() != null)
tbl.tableDescriptor().luceneIndex().remove(row.key());
tbl.remove(row);
}
/** {@inheritDoc} */
@Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String schemaName,
String cacheName, String qry, String typeName, IndexingQueryFilter filters, int limit) throws IgniteCheckedException {
H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, typeName);
if (tbl != null && tbl.luceneIndex() != null) {
long qryId = runningQueryManager().register(
qry,
TEXT,
schemaName,
true,
null,
null,
false,
false,
false
);
Throwable failReason = null;
try {
return tbl.luceneIndex().query(qry.toUpperCase(), filters, limit);
}
catch (Throwable t) {
failReason = t;
throw t;
}
finally {
runningQueryManager().unregister(qryId, failReason);
}
}
return new GridEmptyCloseableIterator<>();
}
/**
* Queries individual fields (generally used by JDBC drivers).
*
* @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param select Select.
* @param filter Cache name and key filter.
* @param cancel Query cancel.
* @param timeout Timeout.
* @return Query result.
*/
private GridQueryFieldsResult executeSelectLocal(
long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultSelect select,
final IndexingQueryFilter filter,
GridQueryCancel cancel,
int timeout
) {
String qry = qryDesc.sql();
assert select != null;
if (ctx.security().enabled())
checkSecurity(select.cacheIds());
final QueryContext qctx = new QueryContext(
0,
filter,
null,
null,
null,
true
);
return new GridQueryFieldsResultAdapter(select.meta(), null) {
@Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException {
H2PooledConnection conn = connections().connection(qryDesc.schemaName());
try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) {
H2Utils.setupConnection(conn, qctx,
qryDesc.distributedJoins(), qryDesc.enforceJoinOrder(), qryParams.lazy());
PreparedStatement stmt = conn.prepareStatement(qry, H2StatementCache.queryFlags(qryDesc));
// Convert parameters into BinaryObjects.
Marshaller m = ctx.config().getMarshaller();
byte[] paramsBytes = U.marshal(m, qryParams.arguments());
final ClassLoader ldr = U.resolveClassLoader(ctx.config());
Object[] params;
if (m instanceof BinaryMarshaller) {
params = BinaryUtils.rawArrayFromBinary(((BinaryMarshaller)m).binaryMarshaller()
.unmarshal(paramsBytes, ldr));
}
else
params = U.unmarshal(m, paramsBytes, ldr);
H2Utils.bindParameters(stmt, F.asList(params));
H2QueryInfo qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry,
ctx.localNodeId(), qryId);
if (ctx.performanceStatistics().enabled()) {
ctx.performanceStatistics().queryProperty(
GridCacheQueryType.SQL_FIELDS,
qryInfo.nodeId(),
qryInfo.queryId(),
"Local plan",
qryInfo.plan()
);
}
ResultSet rs = executeSqlQueryWithTimer(
stmt,
conn,
qry,
timeout,
cancel,
qryParams.dataPageScanEnabled(),
qryInfo
);
return new H2FieldsIterator(
rs,
conn,
qryParams.pageSize(),
log,
IgniteH2Indexing.this,
qryInfo,
ctx.tracing()
);
}
catch (IgniteCheckedException | RuntimeException | Error e) {
conn.close();
throw e;
}
}
};
}
/** {@inheritDoc} */
@Override public long streamUpdateQuery(
String schemaName,
String qry,
@Nullable Object[] params,
IgniteDataStreamer<?, ?> streamer,
String qryInitiatorId
) throws IgniteCheckedException {
QueryParserResultDml dml = streamerParse(schemaName, qry);
return streamQuery0(qry, schemaName, streamer, dml, params, qryInitiatorId);
}
/** {@inheritDoc} */
@SuppressWarnings({"ForLoopReplaceableByForEach", "ConstantConditions"})
@Override public List<Long> streamBatchedUpdateQuery(
String schemaName,
String qry,
List<Object[]> params,
SqlClientContext cliCtx,
String qryInitiatorId
) throws IgniteCheckedException {
if (cliCtx == null || !cliCtx.isStream()) {
U.warn(log, "Connection is not in streaming mode.");
return zeroBatchedStreamedUpdateResult(params.size());
}
QueryParserResultDml dml = streamerParse(schemaName, qry);
IgniteDataStreamer<?, ?> streamer = cliCtx.streamerForCache(dml.streamTable().cacheName());
assert streamer != null;
List<Long> ress = new ArrayList<>(params.size());
for (int i = 0; i < params.size(); i++) {
long res = streamQuery0(qry, schemaName, streamer, dml, params.get(i), qryInitiatorId);
ress.add(res);
}
return ress;
}
/**
* Perform given statement against given data streamer. Only rows based INSERT is supported.
*
* @param qry Query.
* @param schemaName Schema name.
* @param streamer Streamer to feed data to.
* @param dml DML statement.
* @param args Statement arguments.
* @return Number of rows in given INSERT statement.
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings({"unchecked"})
private long streamQuery0(String qry, String schemaName, IgniteDataStreamer streamer, QueryParserResultDml dml,
final Object[] args, String qryInitiatorId) throws IgniteCheckedException {
long qryId = runningQueryManager().register(
QueryUtils.INCLUDE_SENSITIVE ? qry : sqlWithoutConst(dml.statement()),
GridCacheQueryType.SQL_FIELDS,
schemaName,
true,
null,
qryInitiatorId,
false,
false,
false
);
Exception failReason = null;
try {
UpdatePlan plan = dml.plan();
Iterator<List<?>> iter = new GridQueryCacheObjectsIterator(updateQueryRows(qryId, schemaName, plan, args),
objectContext(), true);
if (!iter.hasNext())
return 0;
IgniteBiTuple<?, ?> t = plan.processRow(iter.next());
if (!iter.hasNext()) {
streamer.addData(t.getKey(), t.getValue());
return 1;
}
else {
Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
rows.put(t.getKey(), t.getValue());
while (iter.hasNext()) {
List<?> row = iter.next();
t = plan.processRow(row);
rows.put(t.getKey(), t.getValue());
}
streamer.addData(rows);
return rows.size();
}
}
catch (IgniteException | IgniteCheckedException e) {
failReason = e;
throw e;
}
finally {
runningQueryManager().unregister(qryId, failReason);
}
}
/**
* Calculates rows for update query.
*
* @param qryId Query id.
* @param schemaName Schema name.
* @param plan Update plan.
* @param args Statement arguments.
* @return Rows for update.
* @throws IgniteCheckedException If failed.
*/
private Iterator<List<?>> updateQueryRows(long qryId, String schemaName, UpdatePlan plan, Object[] args)
throws IgniteCheckedException {
Object[] params = args != null ? args : X.EMPTY_OBJECT_ARRAY;
if (!F.isEmpty(plan.selectQuery())) {
SqlFieldsQuery selectQry = new SqlFieldsQuery(plan.selectQuery())
.setArgs(params)
.setLocal(true);
QueryParserResult selectParseRes = parser.parse(schemaName, selectQry, false);
GridQueryFieldsResult res = executeSelectLocal(
qryId,
selectParseRes.queryDescriptor(),
selectParseRes.queryParameters(),
selectParseRes.select(),
null,
null,
0
);
return res.iterator();
}
else
return plan.createRows(params).iterator();
}
/**
* Parse statement for streamer.
*
* @param schemaName Schema name.
* @param qry Query.
* @return DML.
*/
private QueryParserResultDml streamerParse(String schemaName, String qry) {
QueryParserResult parseRes = parser.parse(schemaName, new SqlFieldsQuery(qry), false);
QueryParserResultDml dml = parseRes.dml();
if (dml == null || !dml.streamable()) {
throw new IgniteSQLException("Streaming mode supports only INSERT commands without subqueries.",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
return dml;
}
/**
* @param size Result size.
* @return List of given size filled with 0Ls.
*/
private static List<Long> zeroBatchedStreamedUpdateResult(int size) {
Long[] res = new Long[size];
Arrays.fill(res, 0L);
return Arrays.asList(res);
}
/**
* Executes sql query statement.
*
* @param conn Connection,.
* @param stmt Statement.
* @param timeoutMillis Query timeout.
* @param cancel Query cancel.
* @return Result.
* @throws IgniteCheckedException If failed.
*/
private ResultSet executeSqlQuery(final H2PooledConnection conn, final PreparedStatement stmt,
int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
if (cancel != null)
cancel.add(() -> cancelStatement(stmt));
Session ses = session(conn);
if (timeoutMillis >= 0)
ses.setQueryTimeout(timeoutMillis);
else
ses.setQueryTimeout((int)distrCfg.defaultQueryTimeout());
try {
return stmt.executeQuery();
}
catch (SQLException e) {
// Throw special exception.
if (e.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
throw new QueryCancelledException();
if (e.getCause() instanceof IgniteSQLException)
throw (IgniteSQLException)e.getCause();
throw new IgniteSQLException(e);
}
}
/**
* Cancel prepared statement.
*
* @param stmt Statement.
*/
private static void cancelStatement(PreparedStatement stmt) {
try {
stmt.cancel();
}
catch (SQLException ignored) {
// No-op.
}
}
/**
* Executes sql query and prints warning if query is too slow..
*
* @param conn Connection,
* @param sql Sql query.
* @param params Parameters.
* @param timeoutMillis Query timeout.
* @param cancel Query cancel.
* @param dataPageScanEnabled If data page scan is enabled.
* @return Result.
* @throws IgniteCheckedException If failed.
*/
public ResultSet executeSqlQueryWithTimer(
H2PooledConnection conn,
String sql,
@Nullable Collection<Object> params,
int timeoutMillis,
@Nullable GridQueryCancel cancel,
Boolean dataPageScanEnabled,
final H2QueryInfo qryInfo
) throws IgniteCheckedException {
PreparedStatement stmt = conn.prepareStatementNoCache(sql);
H2Utils.bindParameters(stmt, params);
return executeSqlQueryWithTimer(stmt,
conn, sql, timeoutMillis, cancel, dataPageScanEnabled, qryInfo);
}
/**
* @param dataPageScanEnabled If data page scan is enabled.
*/
public void enableDataPageScan(Boolean dataPageScanEnabled) {
// Data page scan is enabled by default for SQL.
// TODO https://issues.apache.org/jira/browse/IGNITE-11998
CacheDataTree.setDataPageScanEnabled(false);
}
/**
* Executes sql query and prints warning if query is too slow.
*
* @param stmt Prepared statement for query.
* @param conn Connection.
* @param sql Sql query.
* @param timeoutMillis Query timeout.
* @param cancel Query cancel.
* @param dataPageScanEnabled If data page scan is enabled.
* @return Result.
* @throws IgniteCheckedException If failed.
*/
public ResultSet executeSqlQueryWithTimer(
PreparedStatement stmt,
H2PooledConnection conn,
String sql,
int timeoutMillis,
@Nullable GridQueryCancel cancel,
Boolean dataPageScanEnabled,
final H2QueryInfo qryInfo
) throws IgniteCheckedException {
if (qryInfo != null)
heavyQryTracker.startTracking(qryInfo);
enableDataPageScan(dataPageScanEnabled);
Throwable err = null;
try (
TraceSurroundings ignored = MTC.support(ctx.tracing()
.create(SQL_QRY_EXECUTE, MTC.span())
.addTag(SQL_QRY_TEXT, () -> sql))
) {
return executeSqlQuery(conn, stmt, timeoutMillis, cancel);
}
catch (Throwable e) {
err = e;
throw e;
}
finally {
CacheDataTree.setDataPageScanEnabled(false);
if (qryInfo != null)
heavyQryTracker.stopTracking(qryInfo, err);
}
}
/** {@inheritDoc} */
@SuppressWarnings("deprecation")
@Override public SqlFieldsQuery generateFieldsQuery(String cacheName, SqlQuery qry) {
String schemaName = ctx.query().schemaManager().schemaName(cacheName);
String type = qry.getType();
H2TableDescriptor tblDesc = schemaMgr.tableForType(schemaName, cacheName, type);
if (tblDesc == null)
throw new IgniteSQLException("Failed to find SQL table for type: " + type,
IgniteQueryErrorCode.TABLE_NOT_FOUND);
String sql;
try {
sql = generateFieldsQueryString(qry.getSql(), qry.getAlias(), tblDesc);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
SqlFieldsQuery res = QueryUtils.withQueryTimeout(new SqlFieldsQuery(sql), qry.getTimeout(), TimeUnit.MILLISECONDS);
res.setArgs(qry.getArgs());
res.setDistributedJoins(qry.isDistributedJoins());
res.setLocal(qry.isLocal());
res.setPageSize(qry.getPageSize());
res.setPartitions(qry.getPartitions());
res.setReplicatedOnly(qry.isReplicatedOnly());
res.setSchema(schemaName);
res.setSql(sql);
return res;
}
/**
* Execute command.
*
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param cliCtx CLient context.
* @param cmd Command (native).
* @return Result.
*/
private FieldsQueryCursor<List<?>> executeCommand(
QueryDescriptor qryDesc,
QueryParameters qryParams,
@Nullable SqlClientContext cliCtx,
QueryParserResultCommand cmd
) {
if (cmd.noOp())
return zeroCursor();
SqlCommand cmdNative = cmd.commandNative();
GridSqlStatement cmdH2 = cmd.commandH2();
if (qryDesc.local()) {
throw new IgniteSQLException("DDL statements are not supported for execution on local nodes only",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
long qryId = registerRunningQuery(qryDesc, qryParams, null, null);
CommandResult res = null;
Exception failReason = null;
try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_CMD_QRY_EXECUTE, MTC.span()))) {
res = cmdProc.runCommand(qryDesc.sql(), cmdNative, cmdH2, qryParams, cliCtx, qryId);
return res.cursor();
}
catch (IgniteException e) {
failReason = e;
throw e;
}
catch (IgniteCheckedException e) {
failReason = e;
throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qryDesc.sql() +
", err=" + e.getMessage() + ']', e);
}
finally {
if (res == null || res.unregisterRunningQuery())
runningQueryManager().unregister(qryId, failReason);
}
}
/**
* Check cluster state.
*/
private void checkClusterState() {
if (!ctx.state().publicApiActiveState(true)) {
throw new IgniteException("Can not perform the operation because the cluster is inactive. Note, " +
"that the cluster is considered inactive by default if Ignite Persistent Store is used to " +
"let all the nodes join the cluster. To activate the cluster call" +
" Ignite.cluster().state(ClusterState.ACTIVE).");
}
}
/** {@inheritDoc} */
@SuppressWarnings({"StringEquality"})
@Override public List<FieldsQueryCursor<List<?>>> querySqlFields(
String schemaName,
SqlFieldsQuery qry,
@Nullable SqlClientContext cliCtx,
boolean keepBinary,
boolean failOnMultipleStmts,
GridQueryCancel cancel
) {
List<FieldsQueryCursor<List<?>>> res = new ArrayList<>(1);
SqlFieldsQuery remainingQry = qry;
while (remainingQry != null) {
Span qrySpan = ctx.tracing().create(SQL_QRY, MTC.span())
.addTag(SQL_SCHEMA, () -> schemaName);
try (TraceSurroundings ignored = MTC.supportContinual(qrySpan)) {
// Parse.
QueryParserResult parseRes = parser.parse(schemaName, remainingQry, !failOnMultipleStmts);
qrySpan.addTag(SQL_QRY_TEXT, () -> parseRes.queryDescriptor().sql());
remainingQry = parseRes.remainingQuery();
// Get next command.
QueryDescriptor newQryDesc = parseRes.queryDescriptor();
QueryParameters newQryParams = parseRes.queryParameters();
// Check if there is enough parameters. Batched statements are not checked at this point
// since they pass parameters differently.
if (!newQryDesc.batched()) {
int qryParamsCnt = F.isEmpty(newQryParams.arguments()) ? 0 : newQryParams.arguments().length;
if (qryParamsCnt < parseRes.parametersCount())
throw new IgniteSQLException("Invalid number of query parameters [expected=" +
parseRes.parametersCount() + ", actual=" + qryParamsCnt + ']');
}
// Check if cluster state is valid.
checkClusterState();
// Execute.
if (parseRes.isCommand()) {
QueryParserResultCommand cmd = parseRes.command();
assert cmd != null;
if (cmd.noOp() && remainingQry == null && newQryDesc.sql().isEmpty())
continue;
FieldsQueryCursor<List<?>> cmdRes = executeCommand(
newQryDesc,
newQryParams,
cliCtx,
cmd
);
res.add(cmdRes);
}
else if (parseRes.isDml()) {
QueryParserResultDml dml = parseRes.dml();
assert dml != null;
List<? extends FieldsQueryCursor<List<?>>> dmlRes = executeDml(
newQryDesc,
newQryParams,
dml,
cancel
);
res.addAll(dmlRes);
}
else {
assert parseRes.isSelect();
QueryParserResultSelect select = parseRes.select();
assert select != null;
List<? extends FieldsQueryCursor<List<?>>> qryRes = executeSelect(
newQryDesc,
newQryParams,
select,
keepBinary,
cancel
);
res.addAll(qryRes);
}
}
catch (Throwable th) {
qrySpan.addTag(ERROR, th::getMessage).end();
throw th;
}
}
if (res.isEmpty())
throw new SqlParseException(qry.getSql(), 0, IgniteQueryErrorCode.PARSING, "Invalid SQL query.");
return res;
}
/**
* Execute an all-ready {@link SqlFieldsQuery}.
*
* @param qryDesc Plan key.
* @param qryParams Parameters.
* @param dml DML.
* @param cancel Query cancel state holder.
* @return Query result.
*/
private List<? extends FieldsQueryCursor<List<?>>> executeDml(
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultDml dml,
GridQueryCancel cancel
) {
IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null);
long qryId = registerRunningQuery(qryDesc, qryParams, cancel, dml.statement());
Exception failReason = null;
try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_DML_QRY_EXECUTE, MTC.span()))) {
if (!updateInTxAllowed && ctx.cache().context().tm().inUserTx()) {
throw new IgniteSQLException("DML statements are not allowed inside a transaction over " +
"cache(s) with TRANSACTIONAL atomicity mode (disable this error message with system property " +
"\"-DIGNITE_ALLOW_DML_INSIDE_TRANSACTION=true\")");
}
if (!qryDesc.local()) {
return executeUpdateDistributed(
qryId,
qryDesc,
qryParams,
dml,
cancel
);
}
else {
UpdateResult updRes = executeUpdate(
qryId,
qryDesc,
qryParams,
dml,
true,
filter,
cancel
);
return singletonList(new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
return new IgniteSingletonIterator<>(singletonList(updRes.counter()));
}
}, cancel, true, false));
}
}
catch (IgniteException e) {
failReason = e;
throw e;
}
catch (IgniteCheckedException e) {
failReason = e;
IgniteClusterReadOnlyException roEx = X.cause(e, IgniteClusterReadOnlyException.class);
if (roEx != null) {
throw new IgniteSQLException(
"Failed to execute DML statement. Cluster in read-only mode [stmt=" + qryDesc.sql() +
", params=" + Arrays.deepToString(qryParams.arguments()) + "]",
IgniteQueryErrorCode.CLUSTER_READ_ONLY_MODE_ENABLED,
e
);
}
throw new IgniteSQLException("Failed to execute DML statement [stmt=" + qryDesc.sql() +
", params=" + Arrays.deepToString(qryParams.arguments()) + "]", e);
}
finally {
runningQueryManager().unregister(qryId, failReason);
}
}
/**
* Execute an all-ready {@link SqlFieldsQuery}.
*
* @param qryDesc Plan key.
* @param qryParams Parameters.
* @param select Select.
* @param keepBinary Whether binary objects must not be deserialized automatically.
* @param cancel Query cancel state holder.
* @return Query result.
*/
private List<? extends FieldsQueryCursor<List<?>>> executeSelect(
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultSelect select,
boolean keepBinary,
GridQueryCancel cancel
) {
assert cancel != null;
// Register query.
long qryId = registerRunningQuery(qryDesc, qryParams, cancel, select.statement());
try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_CURSOR_OPEN, MTC.span()))) {
Iterable<List<?>> iter = executeSelect0(
qryId,
qryDesc,
qryParams,
select,
keepBinary,
cancel,
qryParams.timeout());
RegisteredQueryCursor<List<?>> cursor = new RegisteredQueryCursor<>(iter, cancel, runningQueryManager(),
qryParams.lazy(), qryId, ctx.tracing());
cancel.add(cursor::cancel);
cursor.fieldsMeta(select.meta());
cursor.partitionResult(select.twoStepQuery() != null ? select.twoStepQuery().derivedPartitions() : null);
return singletonList(cursor);
}
catch (Exception e) {
runningQueryManager().unregister(qryId, e);
if (e instanceof IgniteCheckedException)
throw U.convertException((IgniteCheckedException)e);
if (e instanceof RuntimeException)
throw (RuntimeException)e;
throw new IgniteSQLException("Failed to execute SELECT statement: " + qryDesc.sql(), e);
}
}
/**
* Execute SELECT statement for DML.
*
* @param qryId Query id.
* @param schema Schema.
* @param selectQry Select query.
* @param cancel Cancel.
* @param timeout Timeout.
* @return Fields query.
*/
private QueryCursorImpl<List<?>> executeSelectForDml(
long qryId,
String schema,
SqlFieldsQuery selectQry,
GridQueryCancel cancel,
int timeout
) {
QueryParserResult parseRes = parser.parse(schema, selectQry, false);
QueryParserResultSelect select = parseRes.select();
assert select != null;
Iterable<List<?>> iter = executeSelect0(
qryId,
parseRes.queryDescriptor(),
parseRes.queryParameters(),
select,
true,
cancel,
timeout
);
QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(iter, cancel, true, parseRes.queryParameters().lazy());
cursor.fieldsMeta(select.meta());
cursor.partitionResult(select.twoStepQuery() != null ? select.twoStepQuery().derivedPartitions() : null);
return cursor;
}
/**
* Execute an all-ready {@link SqlFieldsQuery}.
*
* @param qryId Query id.
* @param qryDesc Plan key.
* @param qryParams Parameters.
* @param select Select.
* @param keepBinary Whether binary objects must not be deserialized automatically.
* @param cancel Query cancel state holder.
* @param timeout Timeout.
* @return Query result.
*/
private Iterable<List<?>> executeSelect0(
long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultSelect select,
boolean keepBinary,
GridQueryCancel cancel,
int timeout
) {
// Check security.
if (ctx.security().enabled())
checkSecurity(select.cacheIds());
Iterable<List<?>> iter;
if (select.splitNeeded()) {
// Distributed query.
GridCacheTwoStepQuery twoStepQry = select.twoStepQuery();
assert twoStepQry != null;
iter = executeSelectDistributed(
qryId,
qryDesc,
qryParams,
twoStepQry,
keepBinary,
cancel,
timeout
);
}
else {
// Local query.
IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null);
GridQueryFieldsResult res = executeSelectLocal(
qryId,
qryDesc,
qryParams,
select,
filter,
cancel,
timeout
);
iter = () -> {
try {
return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary);
}
catch (IgniteCheckedException | IgniteSQLException e) {
throw new CacheException(e);
}
};
}
return iter;
}
/**
* Register running query.
*
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param cancel Query cancel state holder.
* @param stmnt Parsed statement.
* @return Id of registered query or {@code null} if query wasn't registered.
*/
private long registerRunningQuery(
QueryDescriptor qryDesc,
QueryParameters qryParams,
GridQueryCancel cancel,
@Nullable GridSqlStatement stmnt
) {
String qry = QueryUtils.INCLUDE_SENSITIVE || stmnt == null ? qryDesc.sql() : sqlWithoutConst(stmnt);
long res = runningQueryManager().register(
qry,
GridCacheQueryType.SQL_FIELDS,
qryDesc.schemaName(),
qryDesc.local(),
cancel,
qryDesc.queryInitiatorId(),
qryDesc.enforceJoinOrder(),
qryParams.lazy(),
qryDesc.distributedJoins()
);
if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) {
ctx.event().record(new SqlQueryExecutionEvent(
ctx.discovery().localNode(),
GridCacheQueryType.SQL_FIELDS.name() + " query execution.",
qry,
qryParams.arguments(),
ctx.security().enabled() ? ctx.security().securityContext().subject().id() : null));
}
return res;
}
/**
* @param stmnt Statement to print.
* @return SQL query where constant replaced with '?' char.
* @see GridSqlConst#getSQL()
* @see QueryUtils#includeSensitive()
*/
private String sqlWithoutConst(GridSqlStatement stmnt) {
QueryUtils.INCLUDE_SENSITIVE_TL.set(false);
try {
return stmnt.getSQL();
}
finally {
QueryUtils.INCLUDE_SENSITIVE_TL.set(true);
}
}
/**
* Check security access for caches.
*
* @param cacheIds Cache IDs.
*/
private void checkSecurity(Collection<Integer> cacheIds) {
if (F.isEmpty(cacheIds))
return;
for (Integer cacheId : cacheIds) {
DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheId);
if (desc != null)
ctx.security().authorize(desc.cacheName(), SecurityPermission.CACHE_READ);
}
}
/**
* @param lsnr Listener.
*/
public void registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) {
runningQueryManager().registerQueryStartedListener(lsnr);
}
/**
* @param lsnr Listener.
*/
public boolean unregisterQueryStartedListener(Object lsnr) {
return runningQueryManager().unregisterQueryStartedListener(lsnr);
}
/**
* @param lsnr Listener.
*/
public void registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) {
runningQueryManager().registerQueryFinishedListener(lsnr);
}
/**
* @param lsnr Listener.
*/
public boolean unregisterQueryFinishedListener(Object lsnr) {
return runningQueryManager().unregisterQueryFinishedListener(lsnr);
}
/**
* Run distributed query on detected set of partitions.
*
* @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param twoStepQry Two-step query.
* @param keepBinary Keep binary flag.
* @param cancel Cancel handler.
* @param timeout Timeout.
* @return Cursor representing distributed query result.
*/
@SuppressWarnings("IfMayBeConditional")
private Iterable<List<?>> executeSelectDistributed(
final long qryId,
final QueryDescriptor qryDesc,
final QueryParameters qryParams,
final GridCacheTwoStepQuery twoStepQry,
final boolean keepBinary,
final GridQueryCancel cancel,
int timeout
) {
// When explicit partitions are set, there must be an owning cache they should be applied to.
PartitionResult derivedParts = twoStepQry.derivedPartitions();
final int[] parts = PartitionResult.calculatePartitions(
qryParams.partitions(),
derivedParts,
qryParams.arguments()
);
Iterable<List<?>> iter;
if (parts != null && parts.length == 0) {
iter = new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
return new Iterator<List<?>>() {
@Override public boolean hasNext() {
return false;
}
@SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException")
@Override public List<?> next() {
return null;
}
};
}
};
}
else {
iter = new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) {
return IgniteH2Indexing.this.rdcQryExec.query(
qryId,
qryDesc.schemaName(),
twoStepQry,
keepBinary,
qryDesc.enforceJoinOrder(),
timeout,
cancel,
qryParams.arguments(),
parts,
qryParams.lazy(),
qryParams.dataPageScanEnabled(),
qryParams.pageSize()
);
}
}
};
}
return iter;
}
/**
* Executes DML request on map node. Happens only for "skip reducer" mode.
*
* @param schemaName Schema name.
* @param qry Query.
* @param filter Filter.
* @param cancel Cancel state.
* @param loc Locality flag.
* @return Update result.
* @throws IgniteCheckedException if failed.
*/
public UpdateResult executeUpdateOnDataNode(
String schemaName,
SqlFieldsQuery qry,
IndexingQueryFilter filter,
GridQueryCancel cancel,
boolean loc
) throws IgniteCheckedException {
QueryParserResult parseRes = parser.parse(schemaName, qry, false);
assert parseRes.remainingQuery() == null;
QueryParserResultDml dml = parseRes.dml();
assert dml != null;
return executeUpdate(
RunningQueryManager.UNDEFINED_QUERY_ID,
parseRes.queryDescriptor(),
parseRes.queryParameters(),
dml,
loc,
filter,
cancel
);
}
/** {@inheritDoc} */
@Override public boolean isStreamableInsertStatement(String schemaName, SqlFieldsQuery qry) throws SQLException {
QueryParserResult parsed = parser.parse(schemaName, qry, true);
return parsed.isDml() && parsed.dml().streamable() && parsed.remainingQuery() == null;
}
/**
* @return Busy lock.
*/
public GridSpinBusyLock busyLock() {
return busyLock;
}
/**
* @return Map query executor.
*/
public GridMapQueryExecutor mapQueryExecutor() {
return mapQryExec;
}
/**
* @return Reduce query executor.
*/
public GridReduceQueryExecutor reduceQueryExecutor() {
return rdcQryExec;
}
/** {@inheritDoc} */
@Override public RunningQueryManager runningQueryManager() {
return ctx.query().runningQueryManager();
}
/** {@inheritDoc} */
@SuppressWarnings({"deprecation", "AssignmentToStaticFieldFromInstanceMethod"})
@Override public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Starting cache query index...");
// During this check we also load necessary spatial index utils class.
if (H2Utils.checkSpatialIndexEnabled() && log.isDebugEnabled())
log.debug("Spatial indexes are enabled.");
this.busyLock = busyLock;
if (SysProperties.serializeJavaObject) {
U.warn(log, "Serialization of Java objects in H2 was enabled.");
SysProperties.serializeJavaObject = false;
}
this.ctx = ctx;
partReservationMgr = new PartitionReservationManager(ctx);
connMgr = new ConnectionManager(ctx);
heavyQryTracker = ctx.query().runningQueryManager().heavyQueriesTracker();
parser = new QueryParser(this, connMgr, cmd -> cmdProc.isCommandSupported(cmd));
schemaMgr = new H2SchemaManager(ctx, this, connMgr);
schemaMgr.start();
nodeId = ctx.localNodeId();
marshaller = ctx.config().getMarshaller();
mapQryExec = new GridMapQueryExecutor();
rdcQryExec = new GridReduceQueryExecutor();
mapQryExec.start(ctx, this);
rdcQryExec.start(ctx, this);
discoLsnr = evt -> {
mapQryExec.onNodeLeft((DiscoveryEvent)evt);
rdcQryExec.onNodeLeft((DiscoveryEvent)evt);
};
ctx.event().addLocalEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
qryLsnr = (nodeId, msg, plc) -> onMessage(nodeId, msg);
ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, qryLsnr);
partExtractor = new PartitionExtractor(new H2PartitionResolver(this), ctx);
cmdProc = new CommandProcessor(ctx, schemaMgr, this);
if (JdbcUtils.serializer != null)
U.warn(log, "Custom H2 serialization is already configured, will override.");
JdbcUtils.serializer = h2Serializer();
distrCfg = new DistributedIndexingConfiguration(ctx, log);
funcMgr = new FunctionsManager(distrCfg);
// Setup default index key type settings.
CompareMode compareMode = connMgr.dataHandler().getCompareMode();
ctx.indexProcessor().keyTypeSettings()
.stringOptimizedCompare(CompareMode.OFF.equals(compareMode.getName()))
.binaryUnsigned(compareMode.isBinaryUnsigned());
ctx.internalSubscriptionProcessor().registerSchemaChangeListener(new AbstractSchemaChangeListener() {
/** */
@Override public void onColumnsAdded(
String schemaName,
GridQueryTypeDescriptor typeDesc,
GridCacheContextInfo<?, ?> cacheInfo,
List<QueryField> cols
) {
clearPlanCache();
}
/** */
@Override public void onColumnsDropped(
String schemaName,
GridQueryTypeDescriptor typeDesc,
GridCacheContextInfo<?, ?> cacheInfo,
List<String> cols
) {
clearPlanCache();
}
});
}
/**
* @param nodeId Node ID.
* @param msg Message.
*/
public void onMessage(UUID nodeId, Object msg) {
assert msg != null;
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null)
return; // Node left, ignore.
if (!busyLock.enterBusy())
return;
try {
if (msg instanceof GridCacheQueryMarshallable)
((GridCacheQueryMarshallable)msg).unmarshall(ctx.config().getMarshaller(), ctx);
try {
boolean processed = true;
boolean tracebleMsg = false;
if (msg instanceof GridQueryNextPageRequest) {
mapQueryExecutor().onNextPageRequest(node, (GridQueryNextPageRequest)msg);
tracebleMsg = true;
}
else if (msg instanceof GridQueryNextPageResponse) {
reduceQueryExecutor().onNextPage(node, (GridQueryNextPageResponse)msg);
tracebleMsg = true;
}
else if (msg instanceof GridH2QueryRequest)
mapQueryExecutor().onQueryRequest(node, (GridH2QueryRequest)msg);
else if (msg instanceof GridH2DmlRequest)
mapQueryExecutor().onDmlRequest(node, (GridH2DmlRequest)msg);
else if (msg instanceof GridH2DmlResponse)
reduceQueryExecutor().onDmlResponse(node, (GridH2DmlResponse)msg);
else if (msg instanceof GridQueryFailResponse)
reduceQueryExecutor().onFail(node, (GridQueryFailResponse)msg);
else if (msg instanceof GridQueryCancelRequest)
mapQueryExecutor().onCancel(node, (GridQueryCancelRequest)msg);
else
processed = false;
if (processed && log.isDebugEnabled() && (!tracebleMsg || log.isTraceEnabled()))
log.debug("Processed message: [srcNodeId=" + nodeId + ", msg=" + msg + ']');
}
catch (Throwable th) {
U.error(log, "Failed to process message: [srcNodeId=" + nodeId + ", msg=" + msg + ']', th);
}
}
finally {
busyLock.leaveBusy();
}
}
/**
* @return Value object context.
*/
public CacheObjectValueContext objectContext() {
return ctx.query().objectContext();
}
/**
* @param topic Topic.
* @param topicOrd Topic ordinal for {@link GridTopic}.
* @param nodes Nodes.
* @param msg Message.
* @param specialize Optional closure to specialize message for each node.
* @param locNodeHnd Handler for local node.
* @param plc Policy identifying the executor service which will process message.
* @param runLocParallel Run local handler in parallel thread.
* @return {@code true} If all messages sent successfully.
*/
public boolean send(
Object topic,
int topicOrd,
Collection<ClusterNode> nodes,
Message msg,
@Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize,
@Nullable final IgniteInClosure2X<ClusterNode, Message> locNodeHnd,
byte plc,
boolean runLocParallel
) {
boolean ok = true;
if (specialize == null && msg instanceof GridCacheQueryMarshallable)
((GridCacheQueryMarshallable)msg).marshall(marshaller);
ClusterNode locNode = null;
for (ClusterNode node : nodes) {
if (node.isLocal()) {
if (locNode != null)
throw new IllegalStateException();
locNode = node;
continue;
}
try {
if (specialize != null) {
msg = specialize.apply(node, msg);
if (msg instanceof GridCacheQueryMarshallable)
((GridCacheQueryMarshallable)msg).marshall(marshaller);
}
ctx.io().sendGeneric(node, topic, topicOrd, msg, plc);
}
catch (IgniteCheckedException e) {
ok = false;
U.warn(log, "Failed to send message [node=" + node + ", msg=" + msg +
", errMsg=" + e.getMessage() + "]");
}
}
// Local node goes the last to allow parallel execution.
if (locNode != null) {
assert locNodeHnd != null;
if (specialize != null) {
msg = specialize.apply(locNode, msg);
if (msg instanceof GridCacheQueryMarshallable)
((GridCacheQueryMarshallable)msg).marshall(marshaller);
}
if (runLocParallel) {
final ClusterNode finalLocNode = locNode;
final Message finalMsg = msg;
try {
// We prefer runLocal to runLocalSafe, because the latter can produce deadlock here.
ctx.closure().runLocal(new GridPlainRunnable() {
@Override public void run() {
if (!busyLock.enterBusy())
return;
try {
locNodeHnd.apply(finalLocNode, finalMsg);
}
finally {
busyLock.leaveBusy();
}
}
}, plc).listen(logger);
}
catch (IgniteCheckedException e) {
ok = false;
U.error(log, "Failed to execute query locally.", e);
}
}
else
locNodeHnd.apply(locNode, msg);
}
return ok;
}
/**
* @return Serializer.
*/
private JavaObjectSerializer h2Serializer() {
return new H2JavaObjectSerializer();
}
/** {@inheritDoc} */
@Override public void stop() {
if (log.isDebugEnabled())
log.debug("Stopping cache query index...");
mapQryExec.stop();
qryCtxRegistry.clearSharedOnLocalNodeStop();
connMgr.stop();
if (log.isDebugEnabled())
log.debug("Cache query index stopped.");
}
/** {@inheritDoc} */
@Override public void onClientDisconnect() {
// No-op.
}
/** {@inheritDoc} */
@Override public void registerCache(String cacheName, String schemaName, GridCacheContextInfo<?, ?> cacheInfo) {
// No-op.
}
/** {@inheritDoc} */
@Override public void unregisterCache(GridCacheContextInfo<?, ?> cacheInfo) {
String cacheName = cacheInfo.name();
partReservationMgr.onCacheStop(cacheName);
// Unregister connection.
connMgr.onCacheDestroyed();
// Clear query cache.
clearPlanCache();
}
/**
* Remove all cached queries from cached two-steps queries.
*/
private void clearPlanCache() {
parser.clearCache();
}
/** {@inheritDoc} */
@Override public IndexingQueryFilter backupFilter(@Nullable final AffinityTopologyVersion topVer,
@Nullable final int[] parts) {
return backupFilter(topVer, parts, false);
}
/**
* Returns backup filter.
*
* @param topVer Topology version.
* @param parts Partitions.
* @param treatReplicatedAsPartitioned true if need to treat replicated as partitioned (for outer joins).
* @return Backup filter.
*/
public IndexingQueryFilter backupFilter(@Nullable final AffinityTopologyVersion topVer, @Nullable final int[] parts,
boolean treatReplicatedAsPartitioned) {
return new IndexingQueryFilterImpl(ctx, topVer, parts, treatReplicatedAsPartitioned);
}
/**
* @return Ready topology version.
*/
public AffinityTopologyVersion readyTopologyVersion() {
return ctx.cache().context().exchange().readyAffinityVersion();
}
/**
* @param readyVer Ready topology version.
*
* @return {@code true} If pending distributed exchange exists because server topology is changed.
*/
public boolean serverTopologyChanged(AffinityTopologyVersion readyVer) {
GridDhtPartitionsExchangeFuture fut = ctx.cache().context().exchange().lastTopologyFuture();
if (fut.isDone())
return false;
AffinityTopologyVersion initVer = fut.initialVersion();
return initVer.compareTo(readyVer) > 0 && !fut.firstEvent().node().isClient();
}
/**
* @param topVer Topology version.
* @throws IgniteCheckedException If failed.
*/
public void awaitForReadyTopologyVersion(AffinityTopologyVersion topVer) throws IgniteCheckedException {
ctx.cache().context().exchange().affinityReadyFuture(topVer).get();
}
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
rdcQryExec.onDisconnected(reconnectFut);
}
/** {@inheritDoc} */
@Override public void onKernalStop() {
connMgr.onKernalStop();
ctx.io().removeMessageListener(GridTopic.TOPIC_QUERY, qryLsnr);
ctx.event().removeLocalEventListener(discoLsnr);
}
/**
* @return Query context registry.
*/
public QueryContextRegistry queryContextRegistry() {
return qryCtxRegistry;
}
/**
* @return Connection manager.
*/
public ConnectionManager connections() {
return connMgr;
}
/**
* @return Parser.
*/
public QueryParser parser() {
return parser;
}
/**
* @return Schema manager.
*/
public H2SchemaManager schemaManager() {
return schemaMgr;
}
/**
* @return Partition extractor.
*/
public PartitionExtractor partitionExtractor() {
return partExtractor;
}
/**
* @return Partition reservation manager.
*/
public PartitionReservationManager partitionReservationManager() {
return partReservationMgr;
}
/**
* @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param dml DML statement.
* @param cancel Query cancel.
* @return Update result wrapped into {@link GridQueryFieldsResult}
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("unchecked")
private List<QueryCursorImpl<List<?>>> executeUpdateDistributed(
long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultDml dml,
GridQueryCancel cancel
) throws IgniteCheckedException {
if (qryDesc.batched()) {
Collection<UpdateResult> ress;
List<Object[]> argss = qryParams.batchedArguments();
UpdatePlan plan = dml.plan();
GridCacheContext<?, ?> cctx = plan.cacheContext();
// For MVCC case, let's enlist batch elements one by one.
if (plan.hasRows() && plan.mode() == UpdateMode.INSERT) {
CacheOperationContext opCtx = DmlUtils.setKeepBinaryContext(cctx);
try {
List<List<List<?>>> cur = plan.createRows(argss);
//TODO: IGNITE-11176 - Need to support cancellation
ress = DmlUtils.processSelectResultBatched(plan, cur, qryParams.updateBatchSize());
}
finally {
DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
}
}
else {
// Fallback to previous mode.
ress = new ArrayList<>(argss.size());
SQLException batchEx = null;
int[] cntPerRow = new int[argss.size()];
int cntr = 0;
for (Object[] args : argss) {
UpdateResult res;
try {
res = executeUpdate(
qryId,
qryDesc,
qryParams.toSingleBatchedArguments(args),
dml,
false,
null,
cancel
);
cntPerRow[cntr++] = (int)res.counter();
ress.add(res);
}
catch (Exception e ) {
SQLException sqlEx = QueryUtils.toSqlException(e);
batchEx = DmlUtils.chainException(batchEx, sqlEx);
cntPerRow[cntr++] = Statement.EXECUTE_FAILED;
}
}
if (batchEx != null) {
BatchUpdateException e = new BatchUpdateException(batchEx.getMessage(),
batchEx.getSQLState(), batchEx.getErrorCode(), cntPerRow, batchEx);
throw new IgniteCheckedException(e);
}
}
ArrayList<QueryCursorImpl<List<?>>> resCurs = new ArrayList<>(ress.size());
for (UpdateResult res : ress) {
res.throwIfError();
QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(singletonList(
singletonList(res.counter())), cancel, false, false);
resCur.fieldsMeta(UPDATE_RESULT_META);
resCurs.add(resCur);
}
return resCurs;
}
else {
UpdateResult res = executeUpdate(
qryId,
qryDesc,
qryParams,
dml,
false,
null,
cancel
);
res.throwIfError();
QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(singletonList(
singletonList(res.counter())), cancel, false, false);
resCur.fieldsMeta(UPDATE_RESULT_META);
resCur.partitionResult(res.partitionResult());
return singletonList(resCur);
}
}
/**
* Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
*
* @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param dml DML command.
* @param loc Query locality flag.
* @param filters Cache name and key filter.
* @param cancel Cancel.
* @return Update result (modified items count and failed keys).
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("IfMayBeConditional")
private UpdateResult executeUpdate(
long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultDml dml,
boolean loc,
IndexingQueryFilter filters,
GridQueryCancel cancel
) throws IgniteCheckedException {
Object[] errKeys = null;
long items = 0;
PartitionResult partRes = null;
GridCacheContext<?, ?> cctx = dml.plan().cacheContext();
for (int i = 0; i < DFLT_UPDATE_RERUN_ATTEMPTS; i++) {
CacheOperationContext opCtx = cctx != null ? DmlUtils.setKeepBinaryContext(cctx) : null;
UpdateResult r;
try {
r = executeUpdate0(
qryId,
qryDesc,
qryParams,
dml,
loc,
filters,
cancel
);
}
finally {
if (opCtx != null)
DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
}
items += r.counter();
errKeys = r.errorKeys();
partRes = r.partitionResult();
if (F.isEmpty(errKeys))
break;
}
if (F.isEmpty(errKeys) && partRes == null) {
if (items == 1L)
return UpdateResult.ONE;
else if (items == 0L)
return UpdateResult.ZERO;
}
return new UpdateResult(items, errKeys, partRes);
}
/**
* Execute update.
*
* @param qryId Query id.
* @param qryDesc Query descriptor.
* @param qryParams Query parameters.
* @param dml Plan.
* @param loc Local flag.
* @param filters Filters.
* @param cancel Cancel hook.
* @return Update result.
* @throws IgniteCheckedException If failed.
*/
private UpdateResult executeUpdate0(
long qryId,
QueryDescriptor qryDesc,
QueryParameters qryParams,
QueryParserResultDml dml,
boolean loc,
IndexingQueryFilter filters,
GridQueryCancel cancel
) throws IgniteCheckedException {
UpdatePlan plan = dml.plan();
UpdateResult fastUpdateRes = plan.processFast(qryParams.arguments());
if (fastUpdateRes != null)
return fastUpdateRes;
DmlDistributedPlanInfo distributedPlan = loc ? null : plan.distributedPlan();
if (distributedPlan != null) {
if (cancel == null)
cancel = new GridQueryCancel();
UpdateResult result = rdcQryExec.update(
qryDesc.schemaName(),
distributedPlan.getCacheIds(),
qryDesc.sql(),
qryParams.arguments(),
qryDesc.enforceJoinOrder(),
qryParams.pageSize(),
qryParams.timeout(),
qryParams.partitions(),
distributedPlan.isReplicatedOnly(),
cancel
);
// Null is returned in case not all nodes support distributed DML.
if (result != null)
return result;
}
final GridQueryCancel selectCancel = (cancel != null) ? new GridQueryCancel() : null;
if (cancel != null)
cancel.add(selectCancel::cancel);
SqlFieldsQuery selectFieldsQry = new SqlFieldsQuery(plan.selectQuery(), qryDesc.collocated())
.setArgs(qryParams.arguments())
.setDistributedJoins(qryDesc.distributedJoins())
.setEnforceJoinOrder(qryDesc.enforceJoinOrder())
.setLocal(qryDesc.local())
.setPageSize(qryParams.pageSize())
.setTimeout(qryParams.timeout(), TimeUnit.MILLISECONDS)
// We cannot use lazy mode when UPDATE query contains updated columns
// in WHERE condition because it may be cause of update one entry several times
// (when index for such columns is selected for scan):
// e.g. : UPDATE test SET val = val + 1 WHERE val >= ?
.setLazy(qryParams.lazy() && plan.canSelectBeLazy());
Iterable<List<?>> cur;
// Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
// sub-query and not some dummy stuff like "select 1, 2, 3;"
if (!loc && !plan.isLocalSubquery()) {
assert !F.isEmpty(plan.selectQuery());
cur = executeSelectForDml(
qryId,
qryDesc.schemaName(),
selectFieldsQry,
selectCancel,
qryParams.timeout()
);
}
else if (plan.hasRows())
cur = plan.createRows(qryParams.arguments());
else {
selectFieldsQry.setLocal(true);
QueryParserResult selectParseRes = parser.parse(qryDesc.schemaName(), selectFieldsQry, false);
final GridQueryFieldsResult res = executeSelectLocal(
qryId,
selectParseRes.queryDescriptor(),
selectParseRes.queryParameters(),
selectParseRes.select(),
filters,
selectCancel,
qryParams.timeout()
);
cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
try {
return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), true);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
}, cancel, true, qryParams.lazy());
}
int pageSize = qryParams.updateBatchSize();
// TODO: IGNITE-11176 - Need to support cancellation
try {
return DmlUtils.processSelectResult(plan, cur, pageSize);
}
finally {
if (cur instanceof AutoCloseable)
U.closeQuiet((AutoCloseable)cur);
}
}
/**
* @return Heavy queries tracker.
*/
public HeavyQueriesTracker heavyQueriesTracker() {
return heavyQryTracker;
}
/**
* @return Distributed SQL configuration.
*/
public DistributedIndexingConfiguration distributedConfiguration() {
return distrCfg;
}
}