| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.query.h2; |
| |
| import java.sql.BatchUpdateException; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Consumer; |
| import javax.cache.CacheException; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.cache.query.FieldsQueryCursor; |
| import org.apache.ignite.cache.query.QueryCancelledException; |
| import org.apache.ignite.cache.query.SqlFieldsQuery; |
| import org.apache.ignite.cache.query.SqlQuery; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.events.SqlQueryExecutionEvent; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.GridTopic; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.binary.BinaryMarshaller; |
| import org.apache.ignite.internal.binary.BinaryUtils; |
| import org.apache.ignite.internal.managers.communication.GridMessageListener; |
| import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; |
| import org.apache.ignite.internal.processors.cache.CacheOperationContext; |
| import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; |
| import org.apache.ignite.internal.processors.cache.QueryCursorImpl; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; |
| import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; |
| import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; |
| import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; |
| import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; |
| import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcParameterMeta; |
| import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; |
| import org.apache.ignite.internal.processors.query.GridQueryCancel; |
| import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; |
| import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; |
| import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter; |
| import org.apache.ignite.internal.processors.query.GridQueryFinishedInfo; |
| import org.apache.ignite.internal.processors.query.GridQueryIndexing; |
| import org.apache.ignite.internal.processors.query.GridQueryStartedInfo; |
| import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; |
| import org.apache.ignite.internal.processors.query.IgniteSQLException; |
| import org.apache.ignite.internal.processors.query.QueryField; |
| import org.apache.ignite.internal.processors.query.QueryUtils; |
| import org.apache.ignite.internal.processors.query.SqlClientContext; |
| import org.apache.ignite.internal.processors.query.h2.affinity.H2PartitionResolver; |
| import org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor; |
| import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo; |
| import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils; |
| import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode; |
| import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; |
| import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; |
| import org.apache.ignite.internal.processors.query.h2.opt.QueryContext; |
| import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry; |
| import org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst; |
| import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement; |
| import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor; |
| import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor; |
| import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager; |
| import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; |
| import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; |
| import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; |
| import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; |
| import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest; |
| import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse; |
| import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; |
| import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker; |
| import org.apache.ignite.internal.processors.query.running.RunningQueryManager; |
| import org.apache.ignite.internal.processors.query.schema.AbstractSchemaChangeListener; |
| import org.apache.ignite.internal.processors.tracing.MTC; |
| import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; |
| import org.apache.ignite.internal.processors.tracing.Span; |
| import org.apache.ignite.internal.sql.SqlParseException; |
| import org.apache.ignite.internal.sql.command.SqlCommand; |
| import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult; |
| import org.apache.ignite.internal.util.GridEmptyCloseableIterator; |
| import org.apache.ignite.internal.util.GridSpinBusyLock; |
| import org.apache.ignite.internal.util.lang.GridCloseableIterator; |
| import org.apache.ignite.internal.util.lang.GridPlainRunnable; |
| import org.apache.ignite.internal.util.lang.IgniteInClosure2X; |
| import org.apache.ignite.internal.util.lang.IgniteSingletonIterator; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiClosure; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.marshaller.Marshaller; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.apache.ignite.plugin.security.SecurityPermission; |
| import org.apache.ignite.resources.LoggerResource; |
| import org.apache.ignite.spi.indexing.IndexingQueryFilter; |
| import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl; |
| import org.h2.api.ErrorCode; |
| import org.h2.api.JavaObjectSerializer; |
| import org.h2.engine.Session; |
| import org.h2.engine.SysProperties; |
| import org.h2.util.JdbcUtils; |
| import org.h2.value.CompareMode; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static java.util.Collections.singletonList; |
| import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION; |
| import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT; |
| import static org.apache.ignite.internal.processors.query.h2.H2Utils.UPDATE_RESULT_META; |
| import static org.apache.ignite.internal.processors.query.h2.H2Utils.generateFieldsQueryString; |
| import static org.apache.ignite.internal.processors.query.h2.H2Utils.session; |
| import static org.apache.ignite.internal.processors.query.h2.H2Utils.zeroCursor; |
| import static org.apache.ignite.internal.processors.tracing.SpanTags.ERROR; |
| import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_QRY_TEXT; |
| import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_SCHEMA; |
| import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CMD_QRY_EXECUTE; |
| import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CURSOR_OPEN; |
| import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_DML_QRY_EXECUTE; |
| import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_ITER_OPEN; |
| import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY; |
| import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY_EXECUTE; |
| |
| /** |
| * Indexing implementation based on H2 database engine. In this implementation main query language is SQL, |
| * fulltext indexing can be performed using Lucene. |
| * <p> |
| * For each registered {@link GridQueryTypeDescriptor} this SPI will create respective SQL table with |
| * {@code '_key'} and {@code '_val'} fields for key and value, and fields from |
| * {@link GridQueryTypeDescriptor#fields()}. |
| * For each table it will create indexes declared in {@link GridQueryTypeDescriptor#indexes()}. |
| */ |
| public class IgniteH2Indexing implements GridQueryIndexing { |
| /** Default number of attempts to re-run DELETE and UPDATE queries in case of concurrent modifications of values. */ |
| private static final int DFLT_UPDATE_RERUN_ATTEMPTS = 4; |
| |
| /** Cached value of {@code IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION}. */ |
| private final boolean updateInTxAllowed = |
| Boolean.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION); |
| |
| static { |
| // Required to skip checks of forbidden H2 settings, otherwise Ignite fails to start. |
| // |
| // Note, H2 system properties must be overriden here, because the properties are finalized while the class |
| // org.h2.engine.SysProperties is loaded in the IgniteH2Indexing.start(...) method. |
| // |
| // @see ConnectionManager#forbidH2DbSettings(String...) |
| System.setProperty("h2.check", "false"); |
| } |
| |
| /** Logger. */ |
| @LoggerResource |
| private IgniteLogger log; |
| |
| /** Node ID. */ |
| private UUID nodeId; |
| |
| /** */ |
| private Marshaller marshaller; |
| |
| /** */ |
| private GridMapQueryExecutor mapQryExec; |
| |
| /** */ |
| private GridReduceQueryExecutor rdcQryExec; |
| |
| /** */ |
| private GridSpinBusyLock busyLock; |
| |
| /** */ |
| protected volatile GridKernalContext ctx; |
| |
| /** Query context registry. */ |
| private final QueryContextRegistry qryCtxRegistry = new QueryContextRegistry(); |
| |
| /** Processor to execute commands which are neither SELECT, nor DML. */ |
| private CommandProcessor cmdProc; |
| |
| /** Partition reservation manager. */ |
| private PartitionReservationManager partReservationMgr; |
| |
| /** Partition extractor. */ |
| private PartitionExtractor partExtractor; |
| |
| /** Parser. */ |
| private QueryParser parser; |
| |
| /** */ |
| private final IgniteInClosure<? super IgniteInternalFuture<?>> logger = new IgniteInClosure<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> fut) { |
| try { |
| fut.get(); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, e.getMessage(), e); |
| } |
| } |
| }; |
| |
| /** Query executor. */ |
| private ConnectionManager connMgr; |
| |
| /** Schema manager. */ |
| private H2SchemaManager schemaMgr; |
| |
| /** Heavy queries tracker. */ |
| private HeavyQueriesTracker heavyQryTracker; |
| |
| /** Discovery event listener. */ |
| private GridLocalEventListener discoLsnr; |
| |
| /** Query message listener. */ |
| private GridMessageListener qryLsnr; |
| |
| /** Distributed config. */ |
| private DistributedIndexingConfiguration distrCfg; |
| |
| /** Functions manager. */ |
| private FunctionsManager funcMgr; |
| |
| /** |
| * @return Kernal context. |
| */ |
| public GridKernalContext kernalContext() { |
| return ctx; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<JdbcParameterMeta> parameterMetaData(String schemaName, SqlFieldsQuery qry) |
| throws IgniteSQLException { |
| assert qry != null; |
| |
| ArrayList<JdbcParameterMeta> metas = new ArrayList<>(); |
| |
| SqlFieldsQuery curQry = qry; |
| |
| while (curQry != null) { |
| QueryParserResult parsed = parser.parse(schemaName, curQry, true); |
| |
| metas.addAll(parsed.parametersMeta()); |
| |
| curQry = parsed.remainingQuery(); |
| } |
| |
| return metas; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<GridQueryFieldMetadata> resultMetaData(String schemaName, SqlFieldsQuery qry) |
| throws IgniteSQLException { |
| QueryParserResult parsed = parser.parse(schemaName, qry, true); |
| |
| if (parsed.remainingQuery() != null) |
| return null; |
| |
| if (parsed.isSelect()) |
| return parsed.select().meta(); |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void store(GridCacheContext cctx, |
| GridQueryTypeDescriptor type, |
| CacheDataRow row, |
| @Nullable CacheDataRow prevRow, |
| boolean prevRowAvailable |
| ) throws IgniteCheckedException { |
| GridH2Table tbl = schemaMgr.dataTable(type.schemaName(), type.tableName()); |
| |
| if (tbl == null) |
| return; // Type was rejected. |
| |
| if (tbl.tableDescriptor().luceneIndex() != null) { |
| long expireTime = row.expireTime(); |
| |
| if (expireTime == 0L) |
| expireTime = Long.MAX_VALUE; |
| |
| tbl.tableDescriptor().luceneIndex().store(row.key(), row.value(), row.version(), expireTime); |
| } |
| |
| tbl.update(row, prevRow); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void remove(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row) |
| throws IgniteCheckedException { |
| if (log.isDebugEnabled()) { |
| log.debug("Removing key from cache query index [locId=" + nodeId + |
| ", key=" + row.key() + |
| ", val=" + row.value() + ']'); |
| } |
| |
| GridH2Table tbl = schemaMgr.dataTable(type.schemaName(), type.tableName()); |
| |
| if (tbl == null) |
| return; |
| |
| if (tbl.tableDescriptor().luceneIndex() != null) |
| tbl.tableDescriptor().luceneIndex().remove(row.key()); |
| |
| tbl.remove(row); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String schemaName, |
| String cacheName, String qry, String typeName, IndexingQueryFilter filters, int limit) throws IgniteCheckedException { |
| H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, typeName); |
| |
| if (tbl != null && tbl.luceneIndex() != null) { |
| long qryId = runningQueryManager().register( |
| qry, |
| TEXT, |
| schemaName, |
| true, |
| null, |
| null, |
| false, |
| false, |
| false |
| ); |
| |
| Throwable failReason = null; |
| try { |
| return tbl.luceneIndex().query(qry.toUpperCase(), filters, limit); |
| } |
| catch (Throwable t) { |
| failReason = t; |
| |
| throw t; |
| } |
| finally { |
| runningQueryManager().unregister(qryId, failReason); |
| } |
| } |
| |
| return new GridEmptyCloseableIterator<>(); |
| } |
| |
| /** |
| * Queries individual fields (generally used by JDBC drivers). |
| * |
| * @param qryId Query id. |
| * @param qryDesc Query descriptor. |
| * @param qryParams Query parameters. |
| * @param select Select. |
| * @param filter Cache name and key filter. |
| * @param cancel Query cancel. |
| * @param timeout Timeout. |
| * @return Query result. |
| */ |
| private GridQueryFieldsResult executeSelectLocal( |
| long qryId, |
| QueryDescriptor qryDesc, |
| QueryParameters qryParams, |
| QueryParserResultSelect select, |
| final IndexingQueryFilter filter, |
| GridQueryCancel cancel, |
| int timeout |
| ) { |
| String qry = qryDesc.sql(); |
| |
| assert select != null; |
| |
| if (ctx.security().enabled()) |
| checkSecurity(select.cacheIds()); |
| |
| final QueryContext qctx = new QueryContext( |
| 0, |
| filter, |
| null, |
| null, |
| null, |
| true |
| ); |
| |
| return new GridQueryFieldsResultAdapter(select.meta(), null) { |
| @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException { |
| H2PooledConnection conn = connections().connection(qryDesc.schemaName()); |
| |
| try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) { |
| H2Utils.setupConnection(conn, qctx, |
| qryDesc.distributedJoins(), qryDesc.enforceJoinOrder(), qryParams.lazy()); |
| |
| PreparedStatement stmt = conn.prepareStatement(qry, H2StatementCache.queryFlags(qryDesc)); |
| |
| // Convert parameters into BinaryObjects. |
| Marshaller m = ctx.config().getMarshaller(); |
| byte[] paramsBytes = U.marshal(m, qryParams.arguments()); |
| final ClassLoader ldr = U.resolveClassLoader(ctx.config()); |
| |
| Object[] params; |
| |
| if (m instanceof BinaryMarshaller) { |
| params = BinaryUtils.rawArrayFromBinary(((BinaryMarshaller)m).binaryMarshaller() |
| .unmarshal(paramsBytes, ldr)); |
| } |
| else |
| params = U.unmarshal(m, paramsBytes, ldr); |
| |
| H2Utils.bindParameters(stmt, F.asList(params)); |
| |
| H2QueryInfo qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry, |
| ctx.localNodeId(), qryId); |
| |
| if (ctx.performanceStatistics().enabled()) { |
| ctx.performanceStatistics().queryProperty( |
| GridCacheQueryType.SQL_FIELDS, |
| qryInfo.nodeId(), |
| qryInfo.queryId(), |
| "Local plan", |
| qryInfo.plan() |
| ); |
| } |
| |
| ResultSet rs = executeSqlQueryWithTimer( |
| stmt, |
| conn, |
| qry, |
| timeout, |
| cancel, |
| qryParams.dataPageScanEnabled(), |
| qryInfo |
| ); |
| |
| return new H2FieldsIterator( |
| rs, |
| conn, |
| qryParams.pageSize(), |
| log, |
| IgniteH2Indexing.this, |
| qryInfo, |
| ctx.tracing() |
| ); |
| } |
| catch (IgniteCheckedException | RuntimeException | Error e) { |
| conn.close(); |
| |
| throw e; |
| } |
| } |
| }; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long streamUpdateQuery( |
| String schemaName, |
| String qry, |
| @Nullable Object[] params, |
| IgniteDataStreamer<?, ?> streamer, |
| String qryInitiatorId |
| ) throws IgniteCheckedException { |
| QueryParserResultDml dml = streamerParse(schemaName, qry); |
| |
| return streamQuery0(qry, schemaName, streamer, dml, params, qryInitiatorId); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"ForLoopReplaceableByForEach", "ConstantConditions"}) |
| @Override public List<Long> streamBatchedUpdateQuery( |
| String schemaName, |
| String qry, |
| List<Object[]> params, |
| SqlClientContext cliCtx, |
| String qryInitiatorId |
| ) throws IgniteCheckedException { |
| if (cliCtx == null || !cliCtx.isStream()) { |
| U.warn(log, "Connection is not in streaming mode."); |
| |
| return zeroBatchedStreamedUpdateResult(params.size()); |
| } |
| |
| QueryParserResultDml dml = streamerParse(schemaName, qry); |
| |
| IgniteDataStreamer<?, ?> streamer = cliCtx.streamerForCache(dml.streamTable().cacheName()); |
| |
| assert streamer != null; |
| |
| List<Long> ress = new ArrayList<>(params.size()); |
| |
| for (int i = 0; i < params.size(); i++) { |
| long res = streamQuery0(qry, schemaName, streamer, dml, params.get(i), qryInitiatorId); |
| |
| ress.add(res); |
| } |
| |
| return ress; |
| } |
| |
| /** |
| * Perform given statement against given data streamer. Only rows based INSERT is supported. |
| * |
| * @param qry Query. |
| * @param schemaName Schema name. |
| * @param streamer Streamer to feed data to. |
| * @param dml DML statement. |
| * @param args Statement arguments. |
| * @return Number of rows in given INSERT statement. |
| * @throws IgniteCheckedException if failed. |
| */ |
| @SuppressWarnings({"unchecked"}) |
| private long streamQuery0(String qry, String schemaName, IgniteDataStreamer streamer, QueryParserResultDml dml, |
| final Object[] args, String qryInitiatorId) throws IgniteCheckedException { |
| long qryId = runningQueryManager().register( |
| QueryUtils.INCLUDE_SENSITIVE ? qry : sqlWithoutConst(dml.statement()), |
| GridCacheQueryType.SQL_FIELDS, |
| schemaName, |
| true, |
| null, |
| qryInitiatorId, |
| false, |
| false, |
| false |
| ); |
| |
| Exception failReason = null; |
| |
| try { |
| UpdatePlan plan = dml.plan(); |
| |
| Iterator<List<?>> iter = new GridQueryCacheObjectsIterator(updateQueryRows(qryId, schemaName, plan, args), |
| objectContext(), true); |
| |
| if (!iter.hasNext()) |
| return 0; |
| |
| IgniteBiTuple<?, ?> t = plan.processRow(iter.next()); |
| |
| if (!iter.hasNext()) { |
| streamer.addData(t.getKey(), t.getValue()); |
| |
| return 1; |
| } |
| else { |
| Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount()); |
| |
| rows.put(t.getKey(), t.getValue()); |
| |
| while (iter.hasNext()) { |
| List<?> row = iter.next(); |
| |
| t = plan.processRow(row); |
| |
| rows.put(t.getKey(), t.getValue()); |
| } |
| |
| streamer.addData(rows); |
| |
| return rows.size(); |
| } |
| } |
| catch (IgniteException | IgniteCheckedException e) { |
| failReason = e; |
| |
| throw e; |
| } |
| finally { |
| runningQueryManager().unregister(qryId, failReason); |
| } |
| } |
| |
| /** |
| * Calculates rows for update query. |
| * |
| * @param qryId Query id. |
| * @param schemaName Schema name. |
| * @param plan Update plan. |
| * @param args Statement arguments. |
| * @return Rows for update. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private Iterator<List<?>> updateQueryRows(long qryId, String schemaName, UpdatePlan plan, Object[] args) |
| throws IgniteCheckedException { |
| Object[] params = args != null ? args : X.EMPTY_OBJECT_ARRAY; |
| |
| if (!F.isEmpty(plan.selectQuery())) { |
| SqlFieldsQuery selectQry = new SqlFieldsQuery(plan.selectQuery()) |
| .setArgs(params) |
| .setLocal(true); |
| |
| QueryParserResult selectParseRes = parser.parse(schemaName, selectQry, false); |
| |
| GridQueryFieldsResult res = executeSelectLocal( |
| qryId, |
| selectParseRes.queryDescriptor(), |
| selectParseRes.queryParameters(), |
| selectParseRes.select(), |
| null, |
| null, |
| 0 |
| ); |
| |
| return res.iterator(); |
| } |
| else |
| return plan.createRows(params).iterator(); |
| } |
| |
| /** |
| * Parse statement for streamer. |
| * |
| * @param schemaName Schema name. |
| * @param qry Query. |
| * @return DML. |
| */ |
| private QueryParserResultDml streamerParse(String schemaName, String qry) { |
| QueryParserResult parseRes = parser.parse(schemaName, new SqlFieldsQuery(qry), false); |
| |
| QueryParserResultDml dml = parseRes.dml(); |
| |
| if (dml == null || !dml.streamable()) { |
| throw new IgniteSQLException("Streaming mode supports only INSERT commands without subqueries.", |
| IgniteQueryErrorCode.UNSUPPORTED_OPERATION); |
| } |
| |
| return dml; |
| } |
| |
| /** |
| * @param size Result size. |
| * @return List of given size filled with 0Ls. |
| */ |
| private static List<Long> zeroBatchedStreamedUpdateResult(int size) { |
| Long[] res = new Long[size]; |
| |
| Arrays.fill(res, 0L); |
| |
| return Arrays.asList(res); |
| } |
| |
| /** |
| * Executes sql query statement. |
| * |
| * @param conn Connection,. |
| * @param stmt Statement. |
| * @param timeoutMillis Query timeout. |
| * @param cancel Query cancel. |
| * @return Result. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private ResultSet executeSqlQuery(final H2PooledConnection conn, final PreparedStatement stmt, |
| int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException { |
| if (cancel != null) |
| cancel.add(() -> cancelStatement(stmt)); |
| |
| Session ses = session(conn); |
| |
| if (timeoutMillis >= 0) |
| ses.setQueryTimeout(timeoutMillis); |
| else |
| ses.setQueryTimeout((int)distrCfg.defaultQueryTimeout()); |
| |
| try { |
| return stmt.executeQuery(); |
| } |
| catch (SQLException e) { |
| // Throw special exception. |
| if (e.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED) |
| throw new QueryCancelledException(); |
| |
| if (e.getCause() instanceof IgniteSQLException) |
| throw (IgniteSQLException)e.getCause(); |
| |
| throw new IgniteSQLException(e); |
| } |
| } |
| |
| /** |
| * Cancel prepared statement. |
| * |
| * @param stmt Statement. |
| */ |
| private static void cancelStatement(PreparedStatement stmt) { |
| try { |
| stmt.cancel(); |
| } |
| catch (SQLException ignored) { |
| // No-op. |
| } |
| } |
| |
| /** |
| * Executes sql query and prints warning if query is too slow.. |
| * |
| * @param conn Connection, |
| * @param sql Sql query. |
| * @param params Parameters. |
| * @param timeoutMillis Query timeout. |
| * @param cancel Query cancel. |
| * @param dataPageScanEnabled If data page scan is enabled. |
| * @return Result. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public ResultSet executeSqlQueryWithTimer( |
| H2PooledConnection conn, |
| String sql, |
| @Nullable Collection<Object> params, |
| int timeoutMillis, |
| @Nullable GridQueryCancel cancel, |
| Boolean dataPageScanEnabled, |
| final H2QueryInfo qryInfo |
| ) throws IgniteCheckedException { |
| PreparedStatement stmt = conn.prepareStatementNoCache(sql); |
| |
| H2Utils.bindParameters(stmt, params); |
| |
| return executeSqlQueryWithTimer(stmt, |
| conn, sql, timeoutMillis, cancel, dataPageScanEnabled, qryInfo); |
| } |
| |
| /** |
| * @param dataPageScanEnabled If data page scan is enabled. |
| */ |
| public void enableDataPageScan(Boolean dataPageScanEnabled) { |
| // Data page scan is enabled by default for SQL. |
| // TODO https://issues.apache.org/jira/browse/IGNITE-11998 |
| CacheDataTree.setDataPageScanEnabled(false); |
| } |
| |
| /** |
| * Executes sql query and prints warning if query is too slow. |
| * |
| * @param stmt Prepared statement for query. |
| * @param conn Connection. |
| * @param sql Sql query. |
| * @param timeoutMillis Query timeout. |
| * @param cancel Query cancel. |
| * @param dataPageScanEnabled If data page scan is enabled. |
| * @return Result. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public ResultSet executeSqlQueryWithTimer( |
| PreparedStatement stmt, |
| H2PooledConnection conn, |
| String sql, |
| int timeoutMillis, |
| @Nullable GridQueryCancel cancel, |
| Boolean dataPageScanEnabled, |
| final H2QueryInfo qryInfo |
| ) throws IgniteCheckedException { |
| if (qryInfo != null) |
| heavyQryTracker.startTracking(qryInfo); |
| |
| enableDataPageScan(dataPageScanEnabled); |
| |
| Throwable err = null; |
| try ( |
| TraceSurroundings ignored = MTC.support(ctx.tracing() |
| .create(SQL_QRY_EXECUTE, MTC.span()) |
| .addTag(SQL_QRY_TEXT, () -> sql)) |
| ) { |
| return executeSqlQuery(conn, stmt, timeoutMillis, cancel); |
| } |
| catch (Throwable e) { |
| err = e; |
| |
| throw e; |
| } |
| finally { |
| CacheDataTree.setDataPageScanEnabled(false); |
| |
| if (qryInfo != null) |
| heavyQryTracker.stopTracking(qryInfo, err); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("deprecation") |
| @Override public SqlFieldsQuery generateFieldsQuery(String cacheName, SqlQuery qry) { |
| String schemaName = ctx.query().schemaManager().schemaName(cacheName); |
| |
| String type = qry.getType(); |
| |
| H2TableDescriptor tblDesc = schemaMgr.tableForType(schemaName, cacheName, type); |
| |
| if (tblDesc == null) |
| throw new IgniteSQLException("Failed to find SQL table for type: " + type, |
| IgniteQueryErrorCode.TABLE_NOT_FOUND); |
| |
| String sql; |
| |
| try { |
| sql = generateFieldsQueryString(qry.getSql(), qry.getAlias(), tblDesc); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| |
| SqlFieldsQuery res = QueryUtils.withQueryTimeout(new SqlFieldsQuery(sql), qry.getTimeout(), TimeUnit.MILLISECONDS); |
| res.setArgs(qry.getArgs()); |
| res.setDistributedJoins(qry.isDistributedJoins()); |
| res.setLocal(qry.isLocal()); |
| res.setPageSize(qry.getPageSize()); |
| res.setPartitions(qry.getPartitions()); |
| res.setReplicatedOnly(qry.isReplicatedOnly()); |
| res.setSchema(schemaName); |
| res.setSql(sql); |
| |
| return res; |
| } |
| |
| /** |
| * Execute command. |
| * |
| * @param qryDesc Query descriptor. |
| * @param qryParams Query parameters. |
| * @param cliCtx CLient context. |
| * @param cmd Command (native). |
| * @return Result. |
| */ |
| private FieldsQueryCursor<List<?>> executeCommand( |
| QueryDescriptor qryDesc, |
| QueryParameters qryParams, |
| @Nullable SqlClientContext cliCtx, |
| QueryParserResultCommand cmd |
| ) { |
| if (cmd.noOp()) |
| return zeroCursor(); |
| |
| SqlCommand cmdNative = cmd.commandNative(); |
| GridSqlStatement cmdH2 = cmd.commandH2(); |
| |
| if (qryDesc.local()) { |
| throw new IgniteSQLException("DDL statements are not supported for execution on local nodes only", |
| IgniteQueryErrorCode.UNSUPPORTED_OPERATION); |
| } |
| |
| long qryId = registerRunningQuery(qryDesc, qryParams, null, null); |
| |
| CommandResult res = null; |
| |
| Exception failReason = null; |
| |
| try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_CMD_QRY_EXECUTE, MTC.span()))) { |
| res = cmdProc.runCommand(qryDesc.sql(), cmdNative, cmdH2, qryParams, cliCtx, qryId); |
| |
| return res.cursor(); |
| } |
| catch (IgniteException e) { |
| failReason = e; |
| |
| throw e; |
| } |
| catch (IgniteCheckedException e) { |
| failReason = e; |
| |
| throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qryDesc.sql() + |
| ", err=" + e.getMessage() + ']', e); |
| } |
| finally { |
| if (res == null || res.unregisterRunningQuery()) |
| runningQueryManager().unregister(qryId, failReason); |
| } |
| } |
| |
| /** |
| * Check cluster state. |
| */ |
| private void checkClusterState() { |
| if (!ctx.state().publicApiActiveState(true)) { |
| throw new IgniteException("Can not perform the operation because the cluster is inactive. Note, " + |
| "that the cluster is considered inactive by default if Ignite Persistent Store is used to " + |
| "let all the nodes join the cluster. To activate the cluster call" + |
| " Ignite.cluster().state(ClusterState.ACTIVE)."); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"StringEquality"}) |
| @Override public List<FieldsQueryCursor<List<?>>> querySqlFields( |
| String schemaName, |
| SqlFieldsQuery qry, |
| @Nullable SqlClientContext cliCtx, |
| boolean keepBinary, |
| boolean failOnMultipleStmts, |
| GridQueryCancel cancel |
| ) { |
| List<FieldsQueryCursor<List<?>>> res = new ArrayList<>(1); |
| |
| SqlFieldsQuery remainingQry = qry; |
| |
| while (remainingQry != null) { |
| Span qrySpan = ctx.tracing().create(SQL_QRY, MTC.span()) |
| .addTag(SQL_SCHEMA, () -> schemaName); |
| |
| try (TraceSurroundings ignored = MTC.supportContinual(qrySpan)) { |
| // Parse. |
| QueryParserResult parseRes = parser.parse(schemaName, remainingQry, !failOnMultipleStmts); |
| |
| qrySpan.addTag(SQL_QRY_TEXT, () -> parseRes.queryDescriptor().sql()); |
| |
| remainingQry = parseRes.remainingQuery(); |
| |
| // Get next command. |
| QueryDescriptor newQryDesc = parseRes.queryDescriptor(); |
| QueryParameters newQryParams = parseRes.queryParameters(); |
| |
| // Check if there is enough parameters. Batched statements are not checked at this point |
| // since they pass parameters differently. |
| if (!newQryDesc.batched()) { |
| int qryParamsCnt = F.isEmpty(newQryParams.arguments()) ? 0 : newQryParams.arguments().length; |
| |
| if (qryParamsCnt < parseRes.parametersCount()) |
| throw new IgniteSQLException("Invalid number of query parameters [expected=" + |
| parseRes.parametersCount() + ", actual=" + qryParamsCnt + ']'); |
| } |
| |
| // Check if cluster state is valid. |
| checkClusterState(); |
| |
| // Execute. |
| if (parseRes.isCommand()) { |
| QueryParserResultCommand cmd = parseRes.command(); |
| |
| assert cmd != null; |
| |
| if (cmd.noOp() && remainingQry == null && newQryDesc.sql().isEmpty()) |
| continue; |
| |
| FieldsQueryCursor<List<?>> cmdRes = executeCommand( |
| newQryDesc, |
| newQryParams, |
| cliCtx, |
| cmd |
| ); |
| |
| res.add(cmdRes); |
| } |
| else if (parseRes.isDml()) { |
| QueryParserResultDml dml = parseRes.dml(); |
| |
| assert dml != null; |
| |
| List<? extends FieldsQueryCursor<List<?>>> dmlRes = executeDml( |
| newQryDesc, |
| newQryParams, |
| dml, |
| cancel |
| ); |
| |
| res.addAll(dmlRes); |
| } |
| else { |
| assert parseRes.isSelect(); |
| |
| QueryParserResultSelect select = parseRes.select(); |
| |
| assert select != null; |
| |
| List<? extends FieldsQueryCursor<List<?>>> qryRes = executeSelect( |
| newQryDesc, |
| newQryParams, |
| select, |
| keepBinary, |
| cancel |
| ); |
| |
| res.addAll(qryRes); |
| } |
| } |
| catch (Throwable th) { |
| qrySpan.addTag(ERROR, th::getMessage).end(); |
| |
| throw th; |
| } |
| } |
| |
| if (res.isEmpty()) |
| throw new SqlParseException(qry.getSql(), 0, IgniteQueryErrorCode.PARSING, "Invalid SQL query."); |
| |
| return res; |
| } |
| |
| /** |
| * Execute an all-ready {@link SqlFieldsQuery}. |
| * |
| * @param qryDesc Plan key. |
| * @param qryParams Parameters. |
| * @param dml DML. |
| * @param cancel Query cancel state holder. |
| * @return Query result. |
| */ |
| private List<? extends FieldsQueryCursor<List<?>>> executeDml( |
| QueryDescriptor qryDesc, |
| QueryParameters qryParams, |
| QueryParserResultDml dml, |
| GridQueryCancel cancel |
| ) { |
| IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null); |
| |
| long qryId = registerRunningQuery(qryDesc, qryParams, cancel, dml.statement()); |
| |
| Exception failReason = null; |
| |
| try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_DML_QRY_EXECUTE, MTC.span()))) { |
| if (!updateInTxAllowed && ctx.cache().context().tm().inUserTx()) { |
| throw new IgniteSQLException("DML statements are not allowed inside a transaction over " + |
| "cache(s) with TRANSACTIONAL atomicity mode (disable this error message with system property " + |
| "\"-DIGNITE_ALLOW_DML_INSIDE_TRANSACTION=true\")"); |
| } |
| |
| if (!qryDesc.local()) { |
| return executeUpdateDistributed( |
| qryId, |
| qryDesc, |
| qryParams, |
| dml, |
| cancel |
| ); |
| } |
| else { |
| UpdateResult updRes = executeUpdate( |
| qryId, |
| qryDesc, |
| qryParams, |
| dml, |
| true, |
| filter, |
| cancel |
| ); |
| |
| return singletonList(new QueryCursorImpl<>(new Iterable<List<?>>() { |
| @Override public Iterator<List<?>> iterator() { |
| return new IgniteSingletonIterator<>(singletonList(updRes.counter())); |
| } |
| }, cancel, true, false)); |
| } |
| } |
| catch (IgniteException e) { |
| failReason = e; |
| |
| throw e; |
| } |
| catch (IgniteCheckedException e) { |
| failReason = e; |
| |
| IgniteClusterReadOnlyException roEx = X.cause(e, IgniteClusterReadOnlyException.class); |
| |
| if (roEx != null) { |
| throw new IgniteSQLException( |
| "Failed to execute DML statement. Cluster in read-only mode [stmt=" + qryDesc.sql() + |
| ", params=" + Arrays.deepToString(qryParams.arguments()) + "]", |
| IgniteQueryErrorCode.CLUSTER_READ_ONLY_MODE_ENABLED, |
| e |
| ); |
| } |
| |
| throw new IgniteSQLException("Failed to execute DML statement [stmt=" + qryDesc.sql() + |
| ", params=" + Arrays.deepToString(qryParams.arguments()) + "]", e); |
| } |
| finally { |
| runningQueryManager().unregister(qryId, failReason); |
| } |
| } |
| |
| /** |
| * Execute an all-ready {@link SqlFieldsQuery}. |
| * |
| * @param qryDesc Plan key. |
| * @param qryParams Parameters. |
| * @param select Select. |
| * @param keepBinary Whether binary objects must not be deserialized automatically. |
| * @param cancel Query cancel state holder. |
| * @return Query result. |
| */ |
| private List<? extends FieldsQueryCursor<List<?>>> executeSelect( |
| QueryDescriptor qryDesc, |
| QueryParameters qryParams, |
| QueryParserResultSelect select, |
| boolean keepBinary, |
| GridQueryCancel cancel |
| ) { |
| assert cancel != null; |
| |
| // Register query. |
| long qryId = registerRunningQuery(qryDesc, qryParams, cancel, select.statement()); |
| |
| try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_CURSOR_OPEN, MTC.span()))) { |
| Iterable<List<?>> iter = executeSelect0( |
| qryId, |
| qryDesc, |
| qryParams, |
| select, |
| keepBinary, |
| cancel, |
| qryParams.timeout()); |
| |
| RegisteredQueryCursor<List<?>> cursor = new RegisteredQueryCursor<>(iter, cancel, runningQueryManager(), |
| qryParams.lazy(), qryId, ctx.tracing()); |
| |
| cancel.add(cursor::cancel); |
| |
| cursor.fieldsMeta(select.meta()); |
| |
| cursor.partitionResult(select.twoStepQuery() != null ? select.twoStepQuery().derivedPartitions() : null); |
| |
| return singletonList(cursor); |
| } |
| catch (Exception e) { |
| runningQueryManager().unregister(qryId, e); |
| |
| if (e instanceof IgniteCheckedException) |
| throw U.convertException((IgniteCheckedException)e); |
| |
| if (e instanceof RuntimeException) |
| throw (RuntimeException)e; |
| |
| throw new IgniteSQLException("Failed to execute SELECT statement: " + qryDesc.sql(), e); |
| } |
| } |
| |
| /** |
| * Execute SELECT statement for DML. |
| * |
| * @param qryId Query id. |
| * @param schema Schema. |
| * @param selectQry Select query. |
| * @param cancel Cancel. |
| * @param timeout Timeout. |
| * @return Fields query. |
| */ |
| private QueryCursorImpl<List<?>> executeSelectForDml( |
| long qryId, |
| String schema, |
| SqlFieldsQuery selectQry, |
| GridQueryCancel cancel, |
| int timeout |
| ) { |
| QueryParserResult parseRes = parser.parse(schema, selectQry, false); |
| |
| QueryParserResultSelect select = parseRes.select(); |
| |
| assert select != null; |
| |
| Iterable<List<?>> iter = executeSelect0( |
| qryId, |
| parseRes.queryDescriptor(), |
| parseRes.queryParameters(), |
| select, |
| true, |
| cancel, |
| timeout |
| ); |
| |
| QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(iter, cancel, true, parseRes.queryParameters().lazy()); |
| |
| cursor.fieldsMeta(select.meta()); |
| |
| cursor.partitionResult(select.twoStepQuery() != null ? select.twoStepQuery().derivedPartitions() : null); |
| |
| return cursor; |
| } |
| |
| /** |
| * Execute an all-ready {@link SqlFieldsQuery}. |
| * |
| * @param qryId Query id. |
| * @param qryDesc Plan key. |
| * @param qryParams Parameters. |
| * @param select Select. |
| * @param keepBinary Whether binary objects must not be deserialized automatically. |
| * @param cancel Query cancel state holder. |
| * @param timeout Timeout. |
| * @return Query result. |
| */ |
| private Iterable<List<?>> executeSelect0( |
| long qryId, |
| QueryDescriptor qryDesc, |
| QueryParameters qryParams, |
| QueryParserResultSelect select, |
| boolean keepBinary, |
| GridQueryCancel cancel, |
| int timeout |
| ) { |
| // Check security. |
| if (ctx.security().enabled()) |
| checkSecurity(select.cacheIds()); |
| |
| Iterable<List<?>> iter; |
| |
| if (select.splitNeeded()) { |
| // Distributed query. |
| GridCacheTwoStepQuery twoStepQry = select.twoStepQuery(); |
| |
| assert twoStepQry != null; |
| |
| iter = executeSelectDistributed( |
| qryId, |
| qryDesc, |
| qryParams, |
| twoStepQry, |
| keepBinary, |
| cancel, |
| timeout |
| ); |
| } |
| else { |
| // Local query. |
| IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null); |
| |
| GridQueryFieldsResult res = executeSelectLocal( |
| qryId, |
| qryDesc, |
| qryParams, |
| select, |
| filter, |
| cancel, |
| timeout |
| ); |
| |
| iter = () -> { |
| try { |
| return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary); |
| } |
| catch (IgniteCheckedException | IgniteSQLException e) { |
| throw new CacheException(e); |
| } |
| }; |
| } |
| |
| return iter; |
| } |
| |
| /** |
| * Register running query. |
| * |
| * @param qryDesc Query descriptor. |
| * @param qryParams Query parameters. |
| * @param cancel Query cancel state holder. |
| * @param stmnt Parsed statement. |
| * @return Id of registered query or {@code null} if query wasn't registered. |
| */ |
| private long registerRunningQuery( |
| QueryDescriptor qryDesc, |
| QueryParameters qryParams, |
| GridQueryCancel cancel, |
| @Nullable GridSqlStatement stmnt |
| ) { |
| String qry = QueryUtils.INCLUDE_SENSITIVE || stmnt == null ? qryDesc.sql() : sqlWithoutConst(stmnt); |
| |
| long res = runningQueryManager().register( |
| qry, |
| GridCacheQueryType.SQL_FIELDS, |
| qryDesc.schemaName(), |
| qryDesc.local(), |
| cancel, |
| qryDesc.queryInitiatorId(), |
| qryDesc.enforceJoinOrder(), |
| qryParams.lazy(), |
| qryDesc.distributedJoins() |
| ); |
| |
| if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) { |
| ctx.event().record(new SqlQueryExecutionEvent( |
| ctx.discovery().localNode(), |
| GridCacheQueryType.SQL_FIELDS.name() + " query execution.", |
| qry, |
| qryParams.arguments(), |
| ctx.security().enabled() ? ctx.security().securityContext().subject().id() : null)); |
| } |
| |
| return res; |
| } |
| |
| /** |
| * @param stmnt Statement to print. |
| * @return SQL query where constant replaced with '?' char. |
| * @see GridSqlConst#getSQL() |
| * @see QueryUtils#includeSensitive() |
| */ |
| private String sqlWithoutConst(GridSqlStatement stmnt) { |
| QueryUtils.INCLUDE_SENSITIVE_TL.set(false); |
| |
| try { |
| return stmnt.getSQL(); |
| } |
| finally { |
| QueryUtils.INCLUDE_SENSITIVE_TL.set(true); |
| } |
| } |
| |
| /** |
| * Check security access for caches. |
| * |
| * @param cacheIds Cache IDs. |
| */ |
| private void checkSecurity(Collection<Integer> cacheIds) { |
| if (F.isEmpty(cacheIds)) |
| return; |
| |
| for (Integer cacheId : cacheIds) { |
| DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheId); |
| |
| if (desc != null) |
| ctx.security().authorize(desc.cacheName(), SecurityPermission.CACHE_READ); |
| } |
| } |
| |
| /** |
| * @param lsnr Listener. |
| */ |
| public void registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) { |
| runningQueryManager().registerQueryStartedListener(lsnr); |
| } |
| |
| /** |
| * @param lsnr Listener. |
| */ |
| public boolean unregisterQueryStartedListener(Object lsnr) { |
| return runningQueryManager().unregisterQueryStartedListener(lsnr); |
| } |
| |
| /** |
| * @param lsnr Listener. |
| */ |
| public void registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) { |
| runningQueryManager().registerQueryFinishedListener(lsnr); |
| } |
| |
| /** |
| * @param lsnr Listener. |
| */ |
| public boolean unregisterQueryFinishedListener(Object lsnr) { |
| return runningQueryManager().unregisterQueryFinishedListener(lsnr); |
| } |
| |
| /** |
| * Run distributed query on detected set of partitions. |
| * |
| * @param qryId Query id. |
| * @param qryDesc Query descriptor. |
| * @param qryParams Query parameters. |
| * @param twoStepQry Two-step query. |
| * @param keepBinary Keep binary flag. |
| * @param cancel Cancel handler. |
| * @param timeout Timeout. |
| * @return Cursor representing distributed query result. |
| */ |
| @SuppressWarnings("IfMayBeConditional") |
| private Iterable<List<?>> executeSelectDistributed( |
| final long qryId, |
| final QueryDescriptor qryDesc, |
| final QueryParameters qryParams, |
| final GridCacheTwoStepQuery twoStepQry, |
| final boolean keepBinary, |
| final GridQueryCancel cancel, |
| int timeout |
| ) { |
| // When explicit partitions are set, there must be an owning cache they should be applied to. |
| PartitionResult derivedParts = twoStepQry.derivedPartitions(); |
| |
| final int[] parts = PartitionResult.calculatePartitions( |
| qryParams.partitions(), |
| derivedParts, |
| qryParams.arguments() |
| ); |
| |
| Iterable<List<?>> iter; |
| |
| if (parts != null && parts.length == 0) { |
| iter = new Iterable<List<?>>() { |
| @Override public Iterator<List<?>> iterator() { |
| return new Iterator<List<?>>() { |
| @Override public boolean hasNext() { |
| return false; |
| } |
| |
| @SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException") |
| @Override public List<?> next() { |
| return null; |
| } |
| }; |
| } |
| }; |
| } |
| else { |
| iter = new Iterable<List<?>>() { |
| @Override public Iterator<List<?>> iterator() { |
| try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) { |
| return IgniteH2Indexing.this.rdcQryExec.query( |
| qryId, |
| qryDesc.schemaName(), |
| twoStepQry, |
| keepBinary, |
| qryDesc.enforceJoinOrder(), |
| timeout, |
| cancel, |
| qryParams.arguments(), |
| parts, |
| qryParams.lazy(), |
| qryParams.dataPageScanEnabled(), |
| qryParams.pageSize() |
| ); |
| } |
| } |
| }; |
| } |
| |
| return iter; |
| } |
| |
| /** |
| * Executes DML request on map node. Happens only for "skip reducer" mode. |
| * |
| * @param schemaName Schema name. |
| * @param qry Query. |
| * @param filter Filter. |
| * @param cancel Cancel state. |
| * @param loc Locality flag. |
| * @return Update result. |
| * @throws IgniteCheckedException if failed. |
| */ |
| public UpdateResult executeUpdateOnDataNode( |
| String schemaName, |
| SqlFieldsQuery qry, |
| IndexingQueryFilter filter, |
| GridQueryCancel cancel, |
| boolean loc |
| ) throws IgniteCheckedException { |
| QueryParserResult parseRes = parser.parse(schemaName, qry, false); |
| |
| assert parseRes.remainingQuery() == null; |
| |
| QueryParserResultDml dml = parseRes.dml(); |
| |
| assert dml != null; |
| |
| return executeUpdate( |
| RunningQueryManager.UNDEFINED_QUERY_ID, |
| parseRes.queryDescriptor(), |
| parseRes.queryParameters(), |
| dml, |
| loc, |
| filter, |
| cancel |
| ); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isStreamableInsertStatement(String schemaName, SqlFieldsQuery qry) throws SQLException { |
| QueryParserResult parsed = parser.parse(schemaName, qry, true); |
| |
| return parsed.isDml() && parsed.dml().streamable() && parsed.remainingQuery() == null; |
| } |
| |
| /** |
| * @return Busy lock. |
| */ |
| public GridSpinBusyLock busyLock() { |
| return busyLock; |
| } |
| |
| /** |
| * @return Map query executor. |
| */ |
| public GridMapQueryExecutor mapQueryExecutor() { |
| return mapQryExec; |
| } |
| |
| /** |
| * @return Reduce query executor. |
| */ |
| public GridReduceQueryExecutor reduceQueryExecutor() { |
| return rdcQryExec; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public RunningQueryManager runningQueryManager() { |
| return ctx.query().runningQueryManager(); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"deprecation", "AssignmentToStaticFieldFromInstanceMethod"}) |
| @Override public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Starting cache query index..."); |
| |
| // During this check we also load necessary spatial index utils class. |
| if (H2Utils.checkSpatialIndexEnabled() && log.isDebugEnabled()) |
| log.debug("Spatial indexes are enabled."); |
| |
| this.busyLock = busyLock; |
| |
| if (SysProperties.serializeJavaObject) { |
| U.warn(log, "Serialization of Java objects in H2 was enabled."); |
| |
| SysProperties.serializeJavaObject = false; |
| } |
| |
| this.ctx = ctx; |
| |
| partReservationMgr = new PartitionReservationManager(ctx); |
| |
| connMgr = new ConnectionManager(ctx); |
| |
| heavyQryTracker = ctx.query().runningQueryManager().heavyQueriesTracker(); |
| |
| parser = new QueryParser(this, connMgr, cmd -> cmdProc.isCommandSupported(cmd)); |
| |
| schemaMgr = new H2SchemaManager(ctx, this, connMgr); |
| schemaMgr.start(); |
| |
| nodeId = ctx.localNodeId(); |
| marshaller = ctx.config().getMarshaller(); |
| |
| mapQryExec = new GridMapQueryExecutor(); |
| rdcQryExec = new GridReduceQueryExecutor(); |
| |
| mapQryExec.start(ctx, this); |
| rdcQryExec.start(ctx, this); |
| |
| discoLsnr = evt -> { |
| mapQryExec.onNodeLeft((DiscoveryEvent)evt); |
| rdcQryExec.onNodeLeft((DiscoveryEvent)evt); |
| }; |
| |
| ctx.event().addLocalEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); |
| |
| qryLsnr = (nodeId, msg, plc) -> onMessage(nodeId, msg); |
| |
| ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, qryLsnr); |
| |
| partExtractor = new PartitionExtractor(new H2PartitionResolver(this), ctx); |
| |
| cmdProc = new CommandProcessor(ctx, schemaMgr, this); |
| |
| if (JdbcUtils.serializer != null) |
| U.warn(log, "Custom H2 serialization is already configured, will override."); |
| |
| JdbcUtils.serializer = h2Serializer(); |
| |
| distrCfg = new DistributedIndexingConfiguration(ctx, log); |
| |
| funcMgr = new FunctionsManager(distrCfg); |
| |
| // Setup default index key type settings. |
| CompareMode compareMode = connMgr.dataHandler().getCompareMode(); |
| |
| ctx.indexProcessor().keyTypeSettings() |
| .stringOptimizedCompare(CompareMode.OFF.equals(compareMode.getName())) |
| .binaryUnsigned(compareMode.isBinaryUnsigned()); |
| |
| ctx.internalSubscriptionProcessor().registerSchemaChangeListener(new AbstractSchemaChangeListener() { |
| /** */ |
| @Override public void onColumnsAdded( |
| String schemaName, |
| GridQueryTypeDescriptor typeDesc, |
| GridCacheContextInfo<?, ?> cacheInfo, |
| List<QueryField> cols |
| ) { |
| clearPlanCache(); |
| } |
| |
| /** */ |
| @Override public void onColumnsDropped( |
| String schemaName, |
| GridQueryTypeDescriptor typeDesc, |
| GridCacheContextInfo<?, ?> cacheInfo, |
| List<String> cols |
| ) { |
| clearPlanCache(); |
| } |
| }); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param msg Message. |
| */ |
| public void onMessage(UUID nodeId, Object msg) { |
| assert msg != null; |
| |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| if (node == null) |
| return; // Node left, ignore. |
| |
| if (!busyLock.enterBusy()) |
| return; |
| |
| try { |
| if (msg instanceof GridCacheQueryMarshallable) |
| ((GridCacheQueryMarshallable)msg).unmarshall(ctx.config().getMarshaller(), ctx); |
| |
| try { |
| boolean processed = true; |
| |
| boolean tracebleMsg = false; |
| |
| if (msg instanceof GridQueryNextPageRequest) { |
| mapQueryExecutor().onNextPageRequest(node, (GridQueryNextPageRequest)msg); |
| |
| tracebleMsg = true; |
| } |
| else if (msg instanceof GridQueryNextPageResponse) { |
| reduceQueryExecutor().onNextPage(node, (GridQueryNextPageResponse)msg); |
| |
| tracebleMsg = true; |
| } |
| else if (msg instanceof GridH2QueryRequest) |
| mapQueryExecutor().onQueryRequest(node, (GridH2QueryRequest)msg); |
| else if (msg instanceof GridH2DmlRequest) |
| mapQueryExecutor().onDmlRequest(node, (GridH2DmlRequest)msg); |
| else if (msg instanceof GridH2DmlResponse) |
| reduceQueryExecutor().onDmlResponse(node, (GridH2DmlResponse)msg); |
| else if (msg instanceof GridQueryFailResponse) |
| reduceQueryExecutor().onFail(node, (GridQueryFailResponse)msg); |
| else if (msg instanceof GridQueryCancelRequest) |
| mapQueryExecutor().onCancel(node, (GridQueryCancelRequest)msg); |
| else |
| processed = false; |
| |
| if (processed && log.isDebugEnabled() && (!tracebleMsg || log.isTraceEnabled())) |
| log.debug("Processed message: [srcNodeId=" + nodeId + ", msg=" + msg + ']'); |
| } |
| catch (Throwable th) { |
| U.error(log, "Failed to process message: [srcNodeId=" + nodeId + ", msg=" + msg + ']', th); |
| } |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @return Value object context. |
| */ |
| public CacheObjectValueContext objectContext() { |
| return ctx.query().objectContext(); |
| } |
| |
| /** |
| * @param topic Topic. |
| * @param topicOrd Topic ordinal for {@link GridTopic}. |
| * @param nodes Nodes. |
| * @param msg Message. |
| * @param specialize Optional closure to specialize message for each node. |
| * @param locNodeHnd Handler for local node. |
| * @param plc Policy identifying the executor service which will process message. |
| * @param runLocParallel Run local handler in parallel thread. |
| * @return {@code true} If all messages sent successfully. |
| */ |
| public boolean send( |
| Object topic, |
| int topicOrd, |
| Collection<ClusterNode> nodes, |
| Message msg, |
| @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize, |
| @Nullable final IgniteInClosure2X<ClusterNode, Message> locNodeHnd, |
| byte plc, |
| boolean runLocParallel |
| ) { |
| boolean ok = true; |
| |
| if (specialize == null && msg instanceof GridCacheQueryMarshallable) |
| ((GridCacheQueryMarshallable)msg).marshall(marshaller); |
| |
| ClusterNode locNode = null; |
| |
| for (ClusterNode node : nodes) { |
| if (node.isLocal()) { |
| if (locNode != null) |
| throw new IllegalStateException(); |
| |
| locNode = node; |
| |
| continue; |
| } |
| |
| try { |
| if (specialize != null) { |
| msg = specialize.apply(node, msg); |
| |
| if (msg instanceof GridCacheQueryMarshallable) |
| ((GridCacheQueryMarshallable)msg).marshall(marshaller); |
| } |
| |
| ctx.io().sendGeneric(node, topic, topicOrd, msg, plc); |
| } |
| catch (IgniteCheckedException e) { |
| ok = false; |
| |
| U.warn(log, "Failed to send message [node=" + node + ", msg=" + msg + |
| ", errMsg=" + e.getMessage() + "]"); |
| } |
| } |
| |
| // Local node goes the last to allow parallel execution. |
| if (locNode != null) { |
| assert locNodeHnd != null; |
| |
| if (specialize != null) { |
| msg = specialize.apply(locNode, msg); |
| |
| if (msg instanceof GridCacheQueryMarshallable) |
| ((GridCacheQueryMarshallable)msg).marshall(marshaller); |
| } |
| |
| if (runLocParallel) { |
| final ClusterNode finalLocNode = locNode; |
| final Message finalMsg = msg; |
| |
| try { |
| // We prefer runLocal to runLocalSafe, because the latter can produce deadlock here. |
| ctx.closure().runLocal(new GridPlainRunnable() { |
| @Override public void run() { |
| if (!busyLock.enterBusy()) |
| return; |
| |
| try { |
| locNodeHnd.apply(finalLocNode, finalMsg); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| }, plc).listen(logger); |
| } |
| catch (IgniteCheckedException e) { |
| ok = false; |
| |
| U.error(log, "Failed to execute query locally.", e); |
| } |
| } |
| else |
| locNodeHnd.apply(locNode, msg); |
| } |
| |
| return ok; |
| } |
| |
| /** |
| * @return Serializer. |
| */ |
| private JavaObjectSerializer h2Serializer() { |
| return new H2JavaObjectSerializer(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop() { |
| if (log.isDebugEnabled()) |
| log.debug("Stopping cache query index..."); |
| |
| mapQryExec.stop(); |
| |
| qryCtxRegistry.clearSharedOnLocalNodeStop(); |
| |
| connMgr.stop(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Cache query index stopped."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onClientDisconnect() { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void registerCache(String cacheName, String schemaName, GridCacheContextInfo<?, ?> cacheInfo) { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void unregisterCache(GridCacheContextInfo<?, ?> cacheInfo) { |
| String cacheName = cacheInfo.name(); |
| |
| partReservationMgr.onCacheStop(cacheName); |
| |
| // Unregister connection. |
| connMgr.onCacheDestroyed(); |
| |
| // Clear query cache. |
| clearPlanCache(); |
| } |
| |
| /** |
| * Remove all cached queries from cached two-steps queries. |
| */ |
| private void clearPlanCache() { |
| parser.clearCache(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IndexingQueryFilter backupFilter(@Nullable final AffinityTopologyVersion topVer, |
| @Nullable final int[] parts) { |
| return backupFilter(topVer, parts, false); |
| } |
| |
| /** |
| * Returns backup filter. |
| * |
| * @param topVer Topology version. |
| * @param parts Partitions. |
| * @param treatReplicatedAsPartitioned true if need to treat replicated as partitioned (for outer joins). |
| * @return Backup filter. |
| */ |
| public IndexingQueryFilter backupFilter(@Nullable final AffinityTopologyVersion topVer, @Nullable final int[] parts, |
| boolean treatReplicatedAsPartitioned) { |
| return new IndexingQueryFilterImpl(ctx, topVer, parts, treatReplicatedAsPartitioned); |
| } |
| |
| /** |
| * @return Ready topology version. |
| */ |
| public AffinityTopologyVersion readyTopologyVersion() { |
| return ctx.cache().context().exchange().readyAffinityVersion(); |
| } |
| |
| /** |
| * @param readyVer Ready topology version. |
| * |
| * @return {@code true} If pending distributed exchange exists because server topology is changed. |
| */ |
| public boolean serverTopologyChanged(AffinityTopologyVersion readyVer) { |
| GridDhtPartitionsExchangeFuture fut = ctx.cache().context().exchange().lastTopologyFuture(); |
| |
| if (fut.isDone()) |
| return false; |
| |
| AffinityTopologyVersion initVer = fut.initialVersion(); |
| |
| return initVer.compareTo(readyVer) > 0 && !fut.firstEvent().node().isClient(); |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void awaitForReadyTopologyVersion(AffinityTopologyVersion topVer) throws IgniteCheckedException { |
| ctx.cache().context().exchange().affinityReadyFuture(topVer).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDisconnected(IgniteFuture<?> reconnectFut) { |
| rdcQryExec.onDisconnected(reconnectFut); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStop() { |
| connMgr.onKernalStop(); |
| |
| ctx.io().removeMessageListener(GridTopic.TOPIC_QUERY, qryLsnr); |
| ctx.event().removeLocalEventListener(discoLsnr); |
| } |
| |
| /** |
| * @return Query context registry. |
| */ |
| public QueryContextRegistry queryContextRegistry() { |
| return qryCtxRegistry; |
| } |
| |
| /** |
| * @return Connection manager. |
| */ |
| public ConnectionManager connections() { |
| return connMgr; |
| } |
| |
| /** |
| * @return Parser. |
| */ |
| public QueryParser parser() { |
| return parser; |
| } |
| |
| /** |
| * @return Schema manager. |
| */ |
| public H2SchemaManager schemaManager() { |
| return schemaMgr; |
| } |
| |
| /** |
| * @return Partition extractor. |
| */ |
| public PartitionExtractor partitionExtractor() { |
| return partExtractor; |
| } |
| |
| /** |
| * @return Partition reservation manager. |
| */ |
| public PartitionReservationManager partitionReservationManager() { |
| return partReservationMgr; |
| } |
| |
| /** |
| * @param qryId Query id. |
| * @param qryDesc Query descriptor. |
| * @param qryParams Query parameters. |
| * @param dml DML statement. |
| * @param cancel Query cancel. |
| * @return Update result wrapped into {@link GridQueryFieldsResult} |
| * @throws IgniteCheckedException if failed. |
| */ |
| @SuppressWarnings("unchecked") |
| private List<QueryCursorImpl<List<?>>> executeUpdateDistributed( |
| long qryId, |
| QueryDescriptor qryDesc, |
| QueryParameters qryParams, |
| QueryParserResultDml dml, |
| GridQueryCancel cancel |
| ) throws IgniteCheckedException { |
| if (qryDesc.batched()) { |
| Collection<UpdateResult> ress; |
| |
| List<Object[]> argss = qryParams.batchedArguments(); |
| |
| UpdatePlan plan = dml.plan(); |
| |
| GridCacheContext<?, ?> cctx = plan.cacheContext(); |
| |
| // For MVCC case, let's enlist batch elements one by one. |
| if (plan.hasRows() && plan.mode() == UpdateMode.INSERT) { |
| CacheOperationContext opCtx = DmlUtils.setKeepBinaryContext(cctx); |
| |
| try { |
| List<List<List<?>>> cur = plan.createRows(argss); |
| |
| //TODO: IGNITE-11176 - Need to support cancellation |
| ress = DmlUtils.processSelectResultBatched(plan, cur, qryParams.updateBatchSize()); |
| } |
| finally { |
| DmlUtils.restoreKeepBinaryContext(cctx, opCtx); |
| } |
| } |
| else { |
| // Fallback to previous mode. |
| ress = new ArrayList<>(argss.size()); |
| |
| SQLException batchEx = null; |
| |
| int[] cntPerRow = new int[argss.size()]; |
| |
| int cntr = 0; |
| |
| for (Object[] args : argss) { |
| UpdateResult res; |
| |
| try { |
| res = executeUpdate( |
| qryId, |
| qryDesc, |
| qryParams.toSingleBatchedArguments(args), |
| dml, |
| false, |
| null, |
| cancel |
| ); |
| |
| cntPerRow[cntr++] = (int)res.counter(); |
| |
| ress.add(res); |
| } |
| catch (Exception e ) { |
| SQLException sqlEx = QueryUtils.toSqlException(e); |
| |
| batchEx = DmlUtils.chainException(batchEx, sqlEx); |
| |
| cntPerRow[cntr++] = Statement.EXECUTE_FAILED; |
| } |
| } |
| |
| if (batchEx != null) { |
| BatchUpdateException e = new BatchUpdateException(batchEx.getMessage(), |
| batchEx.getSQLState(), batchEx.getErrorCode(), cntPerRow, batchEx); |
| |
| throw new IgniteCheckedException(e); |
| } |
| } |
| |
| ArrayList<QueryCursorImpl<List<?>>> resCurs = new ArrayList<>(ress.size()); |
| |
| for (UpdateResult res : ress) { |
| res.throwIfError(); |
| |
| QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(singletonList( |
| singletonList(res.counter())), cancel, false, false); |
| |
| resCur.fieldsMeta(UPDATE_RESULT_META); |
| |
| resCurs.add(resCur); |
| } |
| |
| return resCurs; |
| } |
| else { |
| UpdateResult res = executeUpdate( |
| qryId, |
| qryDesc, |
| qryParams, |
| dml, |
| false, |
| null, |
| cancel |
| ); |
| |
| res.throwIfError(); |
| |
| QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(singletonList( |
| singletonList(res.counter())), cancel, false, false); |
| |
| resCur.fieldsMeta(UPDATE_RESULT_META); |
| |
| resCur.partitionResult(res.partitionResult()); |
| |
| return singletonList(resCur); |
| } |
| } |
| |
| /** |
| * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications. |
| * |
| * @param qryId Query id. |
| * @param qryDesc Query descriptor. |
| * @param qryParams Query parameters. |
| * @param dml DML command. |
| * @param loc Query locality flag. |
| * @param filters Cache name and key filter. |
| * @param cancel Cancel. |
| * @return Update result (modified items count and failed keys). |
| * @throws IgniteCheckedException if failed. |
| */ |
| @SuppressWarnings("IfMayBeConditional") |
| private UpdateResult executeUpdate( |
| long qryId, |
| QueryDescriptor qryDesc, |
| QueryParameters qryParams, |
| QueryParserResultDml dml, |
| boolean loc, |
| IndexingQueryFilter filters, |
| GridQueryCancel cancel |
| ) throws IgniteCheckedException { |
| Object[] errKeys = null; |
| |
| long items = 0; |
| |
| PartitionResult partRes = null; |
| |
| GridCacheContext<?, ?> cctx = dml.plan().cacheContext(); |
| |
| for (int i = 0; i < DFLT_UPDATE_RERUN_ATTEMPTS; i++) { |
| CacheOperationContext opCtx = cctx != null ? DmlUtils.setKeepBinaryContext(cctx) : null; |
| |
| UpdateResult r; |
| |
| try { |
| r = executeUpdate0( |
| qryId, |
| qryDesc, |
| qryParams, |
| dml, |
| loc, |
| filters, |
| cancel |
| ); |
| } |
| finally { |
| if (opCtx != null) |
| DmlUtils.restoreKeepBinaryContext(cctx, opCtx); |
| } |
| |
| items += r.counter(); |
| errKeys = r.errorKeys(); |
| partRes = r.partitionResult(); |
| |
| if (F.isEmpty(errKeys)) |
| break; |
| } |
| |
| if (F.isEmpty(errKeys) && partRes == null) { |
| if (items == 1L) |
| return UpdateResult.ONE; |
| else if (items == 0L) |
| return UpdateResult.ZERO; |
| } |
| |
| return new UpdateResult(items, errKeys, partRes); |
| } |
| |
| /** |
| * Execute update. |
| * |
| * @param qryId Query id. |
| * @param qryDesc Query descriptor. |
| * @param qryParams Query parameters. |
| * @param dml Plan. |
| * @param loc Local flag. |
| * @param filters Filters. |
| * @param cancel Cancel hook. |
| * @return Update result. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private UpdateResult executeUpdate0( |
| long qryId, |
| QueryDescriptor qryDesc, |
| QueryParameters qryParams, |
| QueryParserResultDml dml, |
| boolean loc, |
| IndexingQueryFilter filters, |
| GridQueryCancel cancel |
| ) throws IgniteCheckedException { |
| UpdatePlan plan = dml.plan(); |
| |
| UpdateResult fastUpdateRes = plan.processFast(qryParams.arguments()); |
| |
| if (fastUpdateRes != null) |
| return fastUpdateRes; |
| |
| DmlDistributedPlanInfo distributedPlan = loc ? null : plan.distributedPlan(); |
| |
| if (distributedPlan != null) { |
| if (cancel == null) |
| cancel = new GridQueryCancel(); |
| |
| UpdateResult result = rdcQryExec.update( |
| qryDesc.schemaName(), |
| distributedPlan.getCacheIds(), |
| qryDesc.sql(), |
| qryParams.arguments(), |
| qryDesc.enforceJoinOrder(), |
| qryParams.pageSize(), |
| qryParams.timeout(), |
| qryParams.partitions(), |
| distributedPlan.isReplicatedOnly(), |
| cancel |
| ); |
| |
| // Null is returned in case not all nodes support distributed DML. |
| if (result != null) |
| return result; |
| } |
| |
| final GridQueryCancel selectCancel = (cancel != null) ? new GridQueryCancel() : null; |
| |
| if (cancel != null) |
| cancel.add(selectCancel::cancel); |
| |
| SqlFieldsQuery selectFieldsQry = new SqlFieldsQuery(plan.selectQuery(), qryDesc.collocated()) |
| .setArgs(qryParams.arguments()) |
| .setDistributedJoins(qryDesc.distributedJoins()) |
| .setEnforceJoinOrder(qryDesc.enforceJoinOrder()) |
| .setLocal(qryDesc.local()) |
| .setPageSize(qryParams.pageSize()) |
| .setTimeout(qryParams.timeout(), TimeUnit.MILLISECONDS) |
| // We cannot use lazy mode when UPDATE query contains updated columns |
| // in WHERE condition because it may be cause of update one entry several times |
| // (when index for such columns is selected for scan): |
| // e.g. : UPDATE test SET val = val + 1 WHERE val >= ? |
| .setLazy(qryParams.lazy() && plan.canSelectBeLazy()); |
| |
| Iterable<List<?>> cur; |
| |
| // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual |
| // sub-query and not some dummy stuff like "select 1, 2, 3;" |
| if (!loc && !plan.isLocalSubquery()) { |
| assert !F.isEmpty(plan.selectQuery()); |
| |
| cur = executeSelectForDml( |
| qryId, |
| qryDesc.schemaName(), |
| selectFieldsQry, |
| selectCancel, |
| qryParams.timeout() |
| ); |
| } |
| else if (plan.hasRows()) |
| cur = plan.createRows(qryParams.arguments()); |
| else { |
| selectFieldsQry.setLocal(true); |
| |
| QueryParserResult selectParseRes = parser.parse(qryDesc.schemaName(), selectFieldsQry, false); |
| |
| final GridQueryFieldsResult res = executeSelectLocal( |
| qryId, |
| selectParseRes.queryDescriptor(), |
| selectParseRes.queryParameters(), |
| selectParseRes.select(), |
| filters, |
| selectCancel, |
| qryParams.timeout() |
| ); |
| |
| cur = new QueryCursorImpl<>(new Iterable<List<?>>() { |
| @Override public Iterator<List<?>> iterator() { |
| try { |
| return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), true); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| }, cancel, true, qryParams.lazy()); |
| } |
| |
| int pageSize = qryParams.updateBatchSize(); |
| |
| // TODO: IGNITE-11176 - Need to support cancellation |
| try { |
| return DmlUtils.processSelectResult(plan, cur, pageSize); |
| } |
| finally { |
| if (cur instanceof AutoCloseable) |
| U.closeQuiet((AutoCloseable)cur); |
| } |
| } |
| |
| /** |
| * @return Heavy queries tracker. |
| */ |
| public HeavyQueriesTracker heavyQueriesTracker() { |
| return heavyQryTracker; |
| } |
| |
| /** |
| * @return Distributed SQL configuration. |
| */ |
| public DistributedIndexingConfiguration distributedConfiguration() { |
| return distrCfg; |
| } |
| } |