| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.query; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.IdentityHashMap; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| import javax.cache.Cache; |
| import javax.cache.CacheException; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.binary.BinaryObject; |
| import org.apache.ignite.binary.BinaryObjectException; |
| import org.apache.ignite.binary.BinaryType; |
| import org.apache.ignite.binary.Binarylizable; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CacheKeyConfiguration; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.cache.CacheWriteSynchronizationMode; |
| import org.apache.ignite.cache.QueryEntity; |
| import org.apache.ignite.cache.QueryIndex; |
| import org.apache.ignite.cache.query.FieldsQueryCursor; |
| import org.apache.ignite.cache.query.QueryCursor; |
| import org.apache.ignite.cache.query.SqlFieldsQuery; |
| import org.apache.ignite.cache.query.SqlQuery; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.QueryEngineConfiguration; |
| import org.apache.ignite.events.CacheQueryExecutedEvent; |
| import org.apache.ignite.internal.GridComponent; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.NodeStoppingException; |
| import org.apache.ignite.internal.binary.BinaryMetadata; |
| import org.apache.ignite.internal.cache.query.index.IndexProcessor; |
| import org.apache.ignite.internal.cache.query.index.IndexQueryProcessor; |
| import org.apache.ignite.internal.cache.query.index.IndexQueryResult; |
| import org.apache.ignite.internal.cache.query.index.sorted.maintenance.RebuildIndexWorkflowCallback; |
| import org.apache.ignite.internal.managers.communication.GridMessageListener; |
| import org.apache.ignite.internal.processors.GridProcessorAdapter; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.CacheObjectContext; |
| import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; |
| import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; |
| import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; |
| import org.apache.ignite.internal.processors.cache.ExchangeActions; |
| import org.apache.ignite.internal.processors.cache.GridCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.QueryCursorImpl; |
| import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; |
| import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; |
| import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; |
| import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; |
| import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; |
| import org.apache.ignite.internal.processors.cache.query.IndexQueryDesc; |
| import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; |
| import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; |
| import org.apache.ignite.internal.processors.odbc.jdbc.JdbcParameterMeta; |
| import org.apache.ignite.internal.processors.platform.PlatformContext; |
| import org.apache.ignite.internal.processors.platform.PlatformProcessor; |
| import org.apache.ignite.internal.processors.query.aware.IndexBuildStatusStorage; |
| import org.apache.ignite.internal.processors.query.aware.IndexRebuildFutureStorage; |
| import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty; |
| import org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo; |
| import org.apache.ignite.internal.processors.query.running.RunningQueryManager; |
| import org.apache.ignite.internal.processors.query.schema.IndexRebuildCancelToken; |
| import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; |
| import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; |
| import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl; |
| import org.apache.ignite.internal.processors.query.schema.SchemaOperationClientFuture; |
| import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; |
| import org.apache.ignite.internal.processors.query.schema.SchemaOperationManager; |
| import org.apache.ignite.internal.processors.query.schema.SchemaOperationWorker; |
| import org.apache.ignite.internal.processors.query.schema.management.SchemaManager; |
| import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage; |
| import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; |
| import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage; |
| import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage; |
| import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; |
| import org.apache.ignite.internal.processors.query.schema.operation.SchemaAddQueryEntityOperation; |
| import org.apache.ignite.internal.processors.query.schema.operation.SchemaAlterTableAddColumnOperation; |
| import org.apache.ignite.internal.processors.query.schema.operation.SchemaAlterTableDropColumnOperation; |
| import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation; |
| import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexDropOperation; |
| import org.apache.ignite.internal.processors.query.stat.IgniteStatisticsManager; |
| import org.apache.ignite.internal.processors.query.stat.IgniteStatisticsManagerImpl; |
| import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; |
| import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; |
| import org.apache.ignite.internal.util.GridSpinBusyLock; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.lang.GridCloseableIterator; |
| import org.apache.ignite.internal.util.lang.GridClosureException; |
| import org.apache.ignite.internal.util.lang.GridPlainOutClosure; |
| import org.apache.ignite.internal.util.lang.IgniteOutClosureX; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.T3; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.LT; |
| import org.apache.ignite.internal.util.typedef.internal.SB; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiPredicate; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.marshaller.jdk.JdkMarshaller; |
| import org.apache.ignite.spi.discovery.DiscoveryDataBag; |
| import org.apache.ignite.spi.indexing.IndexingQueryFilter; |
| import org.apache.ignite.thread.IgniteThread; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static java.util.Collections.emptySet; |
| import static java.util.Collections.newSetFromMap; |
| import static java.util.Collections.singleton; |
| import static java.util.Objects.isNull; |
| import static java.util.Objects.nonNull; |
| import static java.util.regex.Pattern.CASE_INSENSITIVE; |
| import static java.util.stream.Collectors.toSet; |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_SCHEMA; |
| import static org.apache.ignite.internal.IgniteComponentType.INDEXING; |
| import static org.apache.ignite.internal.binary.BinaryUtils.fieldTypeName; |
| import static org.apache.ignite.internal.binary.BinaryUtils.typeByClass; |
| import static org.apache.ignite.internal.cache.query.index.sorted.maintenance.MaintenanceRebuildIndexUtils.INDEX_REBUILD_MNTC_TASK_NAME; |
| import static org.apache.ignite.internal.cache.query.index.sorted.maintenance.MaintenanceRebuildIndexUtils.parseMaintenanceTaskParameters; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SCHEMA_POOL; |
| import static org.apache.ignite.internal.processors.query.schema.SchemaOperationException.CODE_COLUMN_EXISTS; |
| |
| /** |
| * Indexing processor. |
| */ |
| @SuppressWarnings("rawtypes") |
| public class GridQueryProcessor extends GridProcessorAdapter { |
| /** */ |
| private static final String INLINE_SIZES_DISCO_BAG_KEY = "inline_sizes"; |
| |
| /** Warn message if some indexes have different inline sizes on the nodes. */ |
| public static final String INLINE_SIZES_DIFFER_WARN_MSG_FORMAT = "Inline sizes on local node and node %s are different. " + |
| "Please drop and create again these indexes to avoid performance problems with SQL queries. Problem indexes: %s"; |
| |
| /** Queries detail metrics eviction frequency. */ |
| private static final int QRY_DETAIL_METRICS_EVICTION_FREQ = 3_000; |
| |
| /** Pattern of query hint. */ |
| public static final Pattern QRY_HINT_PATTERN = Pattern.compile("/\\*\\+((?:.|[\\n\\r])*?)\\*/"); |
| |
| /** Pattern of hint to choose query engine. */ |
| public static final Pattern QRY_ENGINE_PATTERN = |
| Pattern.compile("QUERY_ENGINE[\\s]*\\([\\s]*'([a-z0-9]+)'[\\s]*\\)", |
| CASE_INSENSITIVE); |
| |
| /** */ |
| private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>(); |
| |
| /** For tests. */ |
| public static Class<? extends GridQueryIndexing> idxCls; |
| |
| /** JDK marshaller to serialize errors. */ |
| private final JdkMarshaller marsh = new JdkMarshaller(); |
| |
| /** */ |
| private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); |
| |
| /** */ |
| private GridTimeoutProcessor.CancelableTask qryDetailMetricsEvictTask; |
| |
| /** Type descriptors. */ |
| private final Map<QueryTypeIdKey, QueryTypeDescriptorImpl> types = new ConcurrentHashMap<>(); |
| |
| /** Type descriptors. */ |
| private final ConcurrentMap<QueryTypeNameKey, QueryTypeDescriptorImpl> typesByName = new ConcurrentHashMap<>(); |
| |
| /** */ |
| private final @Nullable GridQueryIndexing idx; |
| |
| /** Indexing manager. */ |
| private final IndexProcessor idxProc; |
| |
| /** Processor to run IndexQuery. */ |
| private final IndexQueryProcessor idxQryPrc; |
| |
| /** Value object context. */ |
| private final CacheQueryObjectValueContext valCtx; |
| |
| /** All indexes. */ |
| private final ConcurrentMap<QueryIndexKey, QueryIndexDescriptorImpl> idxs = new ConcurrentHashMap<>(); |
| |
| /** Schema operation futures created on client side. */ |
| private final ConcurrentMap<UUID, SchemaOperationClientFuture> schemaCliFuts = new ConcurrentHashMap<>(); |
| |
| /** IO message listener. */ |
| private final GridMessageListener ioLsnr; |
| |
| /** Schema operations. */ |
| private final ConcurrentHashMap<String, SchemaOperation> schemaOps = new ConcurrentHashMap<>(); |
| |
| /** Active propose messages. */ |
| private final LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> activeProposals = new LinkedHashMap<>(); |
| |
| /** General state mutex. */ |
| private final Object stateMux = new Object(); |
| |
| /** Coordinator node (initialized lazily). */ |
| private ClusterNode crd; |
| |
| /** Registered cache names. */ |
| private final Collection<String> cacheNames = ConcurrentHashMap.newKeySet(); |
| |
| /** ID history for index create/drop discovery messages. */ |
| private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> dscoMsgIdHist = |
| new GridBoundedConcurrentLinkedHashSet<>(QueryUtils.discoveryHistorySize()); |
| |
| /** History of already completed operations. */ |
| private final GridBoundedConcurrentLinkedHashSet<UUID> completedOpIds = |
| new GridBoundedConcurrentLinkedHashSet<>(QueryUtils.discoveryHistorySize()); |
| |
| /** Pending status messages. */ |
| private final LinkedList<SchemaOperationStatusMessage> pendingMsgs = new LinkedList<>(); |
| |
| /** Current cache that has a query running on it. */ |
| private final ThreadLocal<GridCacheContext> curCache = new ThreadLocal<>(); |
| |
| /** Disconnected flag. */ |
| private boolean disconnected; |
| |
| /** Whether exchange thread is ready to process further requests. */ |
| private boolean exchangeReady; |
| |
| /** */ |
| private boolean skipFieldLookup; |
| |
| /** Cache name - value typeId pairs for which type mismatch message was logged. */ |
| private final Set<Long> missedCacheTypes = ConcurrentHashMap.newKeySet(); |
| |
| /** Index rebuild futures. */ |
| private final IndexRebuildFutureStorage idxRebuildFutStorage = new IndexRebuildFutureStorage(); |
| |
| /** Index build statuses. */ |
| private final IndexBuildStatusStorage idxBuildStatusStorage; |
| |
| /** Statistic manager. */ |
| private IgniteStatisticsManager statsMgr; |
| |
| /** Default query engine. */ |
| private QueryEngine dfltQryEngine; |
| |
| /** All available query engines. */ |
| private QueryEngine[] qryEngines; |
| |
| /** Query engines configuration. */ |
| private QueryEngineConfigurationEx[] qryEnginesCfg; |
| |
| /** Running query manager. */ |
| private RunningQueryManager runningQryMgr; |
| |
| /** Schema manager. */ |
| private final SchemaManager schemaMgr; |
| |
| /** |
| * Constructor. |
| * |
| * @param ctx Kernal context. |
| */ |
| public GridQueryProcessor(GridKernalContext ctx) throws IgniteCheckedException { |
| super(ctx); |
| |
| if (idxCls != null) { |
| idx = U.newInstance(idxCls); |
| |
| idxCls = null; |
| } |
| else |
| idx = INDEXING.inClassPath() ? U.newInstance(INDEXING.className()) : null; |
| |
| schemaMgr = new SchemaManager(ctx); |
| |
| idxProc = ctx.indexProcessor(); |
| |
| idxQryPrc = new IndexQueryProcessor(idxProc); |
| |
| valCtx = new CacheQueryObjectValueContext(ctx); |
| |
| ioLsnr = (nodeId, msg, plc) -> { |
| if (msg instanceof SchemaOperationStatusMessage) { |
| SchemaOperationStatusMessage msg0 = (SchemaOperationStatusMessage)msg; |
| |
| msg0.senderNodeId(nodeId); |
| |
| processStatusMessage(msg0); |
| } |
| else |
| U.warn(log, "Unsupported IO message: " + msg); |
| }; |
| |
| initQueryEngines(); |
| |
| idxBuildStatusStorage = new IndexBuildStatusStorage(ctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() throws IgniteCheckedException { |
| super.start(); |
| |
| runningQryMgr = new RunningQueryManager(ctx); |
| runningQryMgr.start(busyLock); |
| |
| if (idx != null) { |
| ctx.resource().injectGeneric(idx); |
| |
| idx.start(ctx, busyLock); |
| } |
| |
| statsMgr = new IgniteStatisticsManagerImpl(ctx); |
| |
| schemaMgr.start(ctx.config().getSqlConfiguration().getSqlSchemas()); |
| |
| ctx.io().addMessageListener(TOPIC_SCHEMA, ioLsnr); |
| |
| // Schedule queries detail metrics eviction. |
| qryDetailMetricsEvictTask = ctx.timeout().schedule(() -> { |
| for (GridCacheContext ctxs : ctx.cache().context().cacheContexts()) |
| ctxs.queries().evictDetailMetrics(); |
| }, QRY_DETAIL_METRICS_EVICTION_FREQ, QRY_DETAIL_METRICS_EVICTION_FREQ); |
| |
| ctx.maintenanceRegistry().registerWorkflowCallbackIfTaskExists( |
| INDEX_REBUILD_MNTC_TASK_NAME, |
| task -> new RebuildIndexWorkflowCallback(parseMaintenanceTaskParameters(task.parameters()), ctx) |
| ); |
| |
| idxBuildStatusStorage.start(); |
| |
| registerMetadataForRegisteredCaches(false); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStop(boolean cancel) { |
| super.onKernalStop(cancel); |
| |
| if (cancel && idx != null) { |
| try { |
| while (!busyLock.tryBlock(500)) |
| idx.onKernalStop(); |
| |
| return; |
| } |
| catch (InterruptedException ignored) { |
| U.warn(log, "Interrupted while waiting for active queries cancellation."); |
| |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| busyLock.block(); |
| |
| idxBuildStatusStorage.stop(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop(boolean cancel) throws IgniteCheckedException { |
| super.stop(cancel); |
| |
| ctx.io().removeMessageListener(TOPIC_SCHEMA, ioLsnr); |
| |
| if (idx != null) |
| idx.stop(); |
| |
| runningQryMgr.stop(); |
| schemaMgr.stop(); |
| statsMgr.stop(); |
| |
| U.closeQuiet(qryDetailMetricsEvictTask); |
| } |
| |
| /** |
| * Handle cache kernal start. At this point discovery and IO managers are operational, caches are not started yet. |
| * |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void onCacheKernalStart() throws IgniteCheckedException { |
| synchronized (stateMux) { |
| exchangeReady = true; |
| |
| // Re-run pending top-level proposals. |
| for (SchemaOperation schemaOp : schemaOps.values()) |
| onSchemaPropose(schemaOp.proposeMessage()); |
| } |
| |
| idxBuildStatusStorage.onCacheKernalStart(); |
| } |
| |
| /** |
| * Handle cache reconnect. |
| * |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void onCacheReconnect() throws IgniteCheckedException { |
| synchronized (stateMux) { |
| assert disconnected; |
| |
| disconnected = false; |
| |
| onCacheKernalStart(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { |
| return DiscoveryDataExchangeType.QUERY_PROC; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { |
| LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> proposals; |
| |
| // Collect active proposals. |
| synchronized (stateMux) { |
| proposals = new LinkedHashMap<>(activeProposals); |
| } |
| |
| dataBag.addGridCommonData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), proposals); |
| |
| // We should send inline index sizes information only to server nodes. |
| if (!dataBag.isJoiningNodeClient()) { |
| HashMap<String, Serializable> nodeSpecificMap = new HashMap<>(); |
| |
| Serializable oldVal = nodeSpecificMap.put(INLINE_SIZES_DISCO_BAG_KEY, collectSecondaryIndexesInlineSize()); |
| |
| assert oldVal == null : oldVal; |
| |
| dataBag.addNodeSpecificData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), nodeSpecificMap); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) { |
| if (data.hasJoiningNodeData() && data.joiningNodeData() instanceof Map) { |
| Map<String, Serializable> nodeSpecificDataMap = (Map<String, Serializable>)data.joiningNodeData(); |
| |
| if (nodeSpecificDataMap.containsKey(INLINE_SIZES_DISCO_BAG_KEY)) { |
| Serializable serializable = nodeSpecificDataMap.get(INLINE_SIZES_DISCO_BAG_KEY); |
| |
| assert serializable instanceof Map : serializable; |
| |
| Map<String, Integer> joiningNodeIndexesInlineSize = (Map<String, Integer>)serializable; |
| |
| checkInlineSizes(secondaryIndexesInlineSize(), joiningNodeIndexesInlineSize, data.joiningNodeId()); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { |
| HashMap<String, Serializable> dataMap = new HashMap<>(); |
| |
| dataMap.put(INLINE_SIZES_DISCO_BAG_KEY, collectSecondaryIndexesInlineSize()); |
| |
| dataBag.addJoiningNodeData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), dataMap); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { |
| // Preserve proposals. |
| LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> activeProposals = |
| (LinkedHashMap<UUID, SchemaProposeDiscoveryMessage>)data.commonData(); |
| |
| // Process proposals as if they were received as regular discovery messages. |
| if (!F.isEmpty(activeProposals)) { |
| synchronized (stateMux) { |
| for (SchemaProposeDiscoveryMessage activeProposal : activeProposals.values()) |
| onSchemaProposeDiscovery0(activeProposal); |
| } |
| } |
| |
| if (!F.isEmpty(data.nodeSpecificData())) { |
| Map<String, Integer> indexesInlineSize = secondaryIndexesInlineSize(); |
| |
| if (!F.isEmpty(indexesInlineSize)) { |
| for (UUID nodeId : data.nodeSpecificData().keySet()) { |
| Serializable serializable = data.nodeSpecificData().get(nodeId); |
| |
| assert serializable instanceof Map : serializable; |
| |
| Map<String, Serializable> nodeSpecificData = (Map<String, Serializable>)serializable; |
| |
| if (nodeSpecificData.containsKey(INLINE_SIZES_DISCO_BAG_KEY)) |
| checkInlineSizes(indexesInlineSize, (Map<String, Integer>)nodeSpecificData.get(INLINE_SIZES_DISCO_BAG_KEY), nodeId); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Prepare index rebuild futures if needed before exchange. |
| * |
| * @param fut Exchange future. |
| */ |
| public void beforeExchange(GridDhtPartitionsExchangeFuture fut) { |
| Set<Integer> cacheIds = rebuildIndexCacheIds(fut); |
| |
| Set<Integer> rejected = idxRebuildFutStorage.prepareRebuildIndexes(cacheIds, fut.initialVersion()); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Preparing features of rebuilding indexes for caches on exchange [requested=" + cacheIds + |
| ", rejected=" + rejected + ']'); |
| } |
| } |
| |
| /** |
| * Init query engines from the configuration. |
| */ |
| private void initQueryEngines() throws IgniteCheckedException { |
| boolean hasIdxCfg = false; |
| |
| int dfltQryEngineIdx = -1; |
| |
| QueryEngineConfiguration[] qryEnginesCfg = ctx.config().getSqlConfiguration().getQueryEnginesConfiguration(); |
| |
| if (F.isEmpty(qryEnginesCfg)) { |
| // No query engines explicitly configured - indexing will be used. |
| // If indexing is disabled, try to find any query engine in components. |
| if (!indexingEnabled()) { |
| for (GridComponent cmp : ctx.components()) { |
| if (cmp instanceof QueryEngine) { |
| qryEngines = new QueryEngine[] {(QueryEngine)cmp}; |
| dfltQryEngine = (QueryEngine)cmp; |
| } |
| } |
| } |
| |
| return; |
| } |
| |
| this.qryEnginesCfg = new QueryEngineConfigurationEx[qryEnginesCfg.length]; |
| |
| qryEngines = new QueryEngine[qryEnginesCfg.length]; |
| |
| for (int i = 0; i < qryEnginesCfg.length; i++) { |
| QueryEngineConfiguration qryEngineCfg = qryEnginesCfg[i]; |
| |
| if (!(qryEngineCfg instanceof QueryEngineConfigurationEx)) |
| throw new IgniteCheckedException("Unsupported query engine configuration: " + qryEngineCfg.getClass()); |
| |
| QueryEngineConfigurationEx qryEngineCfgEx = (QueryEngineConfigurationEx)qryEngineCfg; |
| |
| this.qryEnginesCfg[i] = qryEngineCfgEx; |
| |
| Class<? extends QueryEngine> qryEngineCls = qryEngineCfgEx.engineClass(); |
| |
| // Check for duplicates. |
| for (int j = 0; j < i; j++) { |
| if (this.qryEnginesCfg[j].engineClass() == qryEngineCls) |
| throw new IgniteCheckedException("Only one instance of each query engine can be set"); |
| } |
| |
| QueryEngine qryEngine = null; |
| |
| if (qryEngineCls == IndexingQueryEngine.class) |
| hasIdxCfg = true; |
| else { |
| for (GridComponent cmp : ctx.components()) { |
| if (!(cmp instanceof QueryEngine && cmp.getClass() == qryEngineCls)) |
| continue; |
| |
| qryEngine = (QueryEngine)cmp; |
| |
| break; |
| } |
| |
| if (qryEngine == null) |
| throw new IgniteCheckedException("Can't find query engine for class " + qryEngineCls); |
| |
| qryEngines[i] = qryEngine; |
| } |
| |
| if (qryEngineCfgEx.isDefault()) { |
| if (dfltQryEngineIdx >= 0) |
| throw new IgniteCheckedException("Only one query engine can be set as default"); |
| |
| dfltQryEngineIdx = i; |
| } |
| } |
| |
| // No query engines set as default - indexing engine will be used by default, if there is no configuration for |
| // indexing engine - first configured engine will be used by default. |
| if (dfltQryEngineIdx < 0) { |
| if (!hasIdxCfg) |
| dfltQryEngine = qryEngines[0]; |
| } |
| else |
| dfltQryEngine = qryEngines[dfltQryEngineIdx]; |
| } |
| |
| /** |
| * @return Information about secondary indexes inline size. Key is a full index name, value is a effective inline size. |
| * @see IndexProcessor#secondaryIndexesInlineSize() |
| */ |
| public Map<String, Integer> secondaryIndexesInlineSize() { |
| return moduleEnabled() ? ctx.indexProcessor().secondaryIndexesInlineSize() : Collections.emptyMap(); |
| } |
| |
| /** |
| * Compares indexes inline size on remote and local and fires warn message, if difference found. |
| * |
| * @param local Information about indexes inline size on local node. |
| * @param remote Information about indexes inline size on remote node. |
| * @param remoteNodeId Remote node id. |
| */ |
| private void checkInlineSizes(Map<String, Integer> local, Map<String, Integer> remote, UUID remoteNodeId) { |
| if (log.isDebugEnabled()) |
| log.debug("Check inline sizes on remote node with node id: " + remoteNodeId + ". Local: " + local + ", remote: " + remote); |
| |
| if (F.isEmpty(local) || F.isEmpty(remote)) |
| return; |
| |
| SB sb = new SB(); |
| |
| for (String idxFullname : local.keySet()) { |
| if (remote.containsKey(idxFullname)) { |
| int locInlineSize = local.get(idxFullname); |
| int remoteInlineSize = remote.get(idxFullname); |
| |
| if (locInlineSize != remoteInlineSize) |
| sb.a(idxFullname).a("(").a(locInlineSize).a(",").a(remoteInlineSize).a(")").a(","); |
| } |
| } |
| |
| if (sb.length() > 0) { |
| sb.setLength(sb.length() - 1); |
| |
| log.warning(String.format(INLINE_SIZES_DIFFER_WARN_MSG_FORMAT, remoteNodeId, sb)); |
| } |
| } |
| |
| /** |
| * @return Serializable information about secondary indexes inline size. |
| * @see #secondaryIndexesInlineSize() |
| */ |
| private Serializable collectSecondaryIndexesInlineSize() { |
| Map<String, Integer> map = secondaryIndexesInlineSize(); |
| |
| return map instanceof Serializable ? (Serializable)map : new HashMap<>(map); |
| } |
| |
| /** |
| * Process schema propose message from discovery thread. |
| * |
| * @param msg Message. |
| * @return {@code True} if exchange should be triggered. |
| */ |
| private boolean onSchemaProposeDiscovery(SchemaProposeDiscoveryMessage msg) { |
| SchemaAbstractOperation op = msg.operation(); |
| |
| UUID opId = op.id(); |
| String cacheName = op.cacheName(); |
| |
| if (!msg.initialized()) { |
| // Ensure cache exists on coordinator node. |
| DynamicCacheDescriptor cacheDesc = ctx.cache().cacheDescriptor(cacheName); |
| |
| if (cacheDesc == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Received schema propose discovery message, but cache doesn't exist " + |
| "(will report error) [opId=" + opId + ", msg=" + msg + ']'); |
| |
| msg.onError(new SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, cacheName)); |
| } |
| else { |
| CacheConfiguration ccfg = cacheDesc.cacheConfiguration(); |
| |
| if (failOnStaticCacheSchemaChanges(cacheDesc)) { |
| // Do not allow any schema changes when keep static cache configuration flag is set. |
| if (log.isDebugEnabled()) |
| log.debug("Received schema propose discovery message, but cache is statically configured " + |
| "and " + IgniteSystemProperties.IGNITE_KEEP_STATIC_CACHE_CONFIGURATION + |
| " flag is set (will report error) [opId=" + opId + ", msg=" + msg + ']'); |
| |
| msg.onError(new SchemaOperationException("Schema changes are not supported for statically " + |
| "configured cache when " + IgniteSystemProperties.IGNITE_KEEP_STATIC_CACHE_CONFIGURATION + |
| " flag is set.")); |
| } |
| else { |
| // Preserve deployment ID so that we can distinguish between different caches with the same name. |
| if (msg.deploymentId() == null) |
| msg.deploymentId(cacheDesc.deploymentId()); |
| |
| assert F.eq(cacheDesc.deploymentId(), msg.deploymentId()); |
| |
| if (msg.operation() instanceof SchemaAlterTableAddColumnOperation) { |
| SchemaAlterTableAddColumnOperation alterOp = (SchemaAlterTableAddColumnOperation)msg.operation(); |
| |
| try { |
| for (QueryField field : alterOp.columns()) { |
| if (!field.isNullable()) |
| QueryUtils.checkNotNullAllowed(ccfg); |
| } |
| } |
| catch (IgniteSQLException ex) { |
| msg.onError(new SchemaOperationException("Received schema propose discovery message, but " + |
| "cache doesn't applicable for this modification", ex)); |
| } |
| } |
| } |
| } |
| } |
| |
| // Complete client future and exit immediately in case of error. |
| if (msg.hasError()) { |
| SchemaOperationClientFuture cliFut = schemaCliFuts.remove(opId); |
| |
| if (cliFut != null) |
| cliFut.onDone(msg.error()); |
| |
| return false; |
| } |
| |
| return onSchemaProposeDiscovery0(msg); |
| } |
| |
| /** |
| * @param cacheDesc Cache descriptor to check. |
| * @return {@code true} if cache is statically configured, IGNITE_KEEP_STATIC_CACHE_CONFIGURATION system property |
| * is set and cache is persistent, {@code false} otherwise. |
| */ |
| private boolean failOnStaticCacheSchemaChanges(DynamicCacheDescriptor cacheDesc) { |
| return cacheDesc.staticallyConfigured() && |
| ctx.cache().keepStaticCacheConfiguration() && |
| cacheDesc.groupDescriptor().persistenceEnabled(); |
| } |
| |
| /** |
| * Process schema propose message from discovery thread (or from cache start routine). |
| * |
| * @param msg Message. |
| * @return {@code True} if exchange should be triggered. |
| */ |
| private boolean onSchemaProposeDiscovery0(SchemaProposeDiscoveryMessage msg) { |
| UUID opId = msg.operation().id(); |
| |
| synchronized (stateMux) { |
| if (disconnected) { |
| if (log.isDebugEnabled()) |
| log.debug("Processing discovery schema propose message, but node is disconnected (will ignore) " + |
| "[opId=" + opId + ", msg=" + msg + ']'); |
| |
| return false; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Processing discovery schema propose message [opId=" + opId + ", msg=" + msg + ']'); |
| |
| // Put message to active operations set. |
| SchemaProposeDiscoveryMessage oldDesc = activeProposals.put(msg.operation().id(), msg); |
| |
| assert oldDesc == null; |
| |
| // Create schema operation and either trigger it immediately from exchange thread or append to already |
| // running operation. |
| SchemaOperation schemaOp = new SchemaOperation(msg); |
| |
| String schemaName = msg.schemaName(); |
| |
| SchemaOperation prevSchemaOp = schemaOps.get(schemaName); |
| |
| if (prevSchemaOp != null) { |
| prevSchemaOp = prevSchemaOp.unwind(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Schema change is enqueued and will be executed after previous operation is completed " + |
| "[opId=" + opId + ", prevOpId=" + prevSchemaOp.id() + ']'); |
| |
| prevSchemaOp.next(schemaOp); |
| |
| return false; |
| } |
| else { |
| schemaOps.put(schemaName, schemaOp); |
| |
| return exchangeReady; |
| } |
| } |
| } |
| |
| /** |
| * Handle schema propose from exchange thread. |
| * |
| * @param msg Discovery message. |
| */ |
| public void onSchemaPropose(SchemaProposeDiscoveryMessage msg) { |
| UUID opId = msg.operation().id(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Processing schema propose message (exchange) [opId=" + opId + ']'); |
| |
| synchronized (stateMux) { |
| if (disconnected) |
| return; |
| |
| SchemaOperation curOp = schemaOps.get(msg.schemaName()); |
| |
| assert curOp != null; |
| assert F.eq(opId, curOp.id()); |
| assert !curOp.started(); |
| |
| startSchemaChange(curOp); |
| } |
| } |
| |
| /** |
| * Process schema finish message from discovery thread. |
| * |
| * @param msg Message. |
| */ |
| private void onSchemaFinishDiscovery(SchemaFinishDiscoveryMessage msg) { |
| UUID opId = msg.operation().id(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Received schema finish message (discovery) [opId=" + opId + ", msg=" + msg + ']'); |
| |
| synchronized (stateMux) { |
| if (disconnected) |
| return; |
| |
| boolean completedOpAdded = completedOpIds.add(opId); |
| |
| assert completedOpAdded; |
| |
| // Remove propose message so that it will not be shared with joining nodes. |
| SchemaProposeDiscoveryMessage proposeMsg = activeProposals.remove(opId); |
| |
| assert proposeMsg != null; |
| |
| // Apply changes to public cache schema if operation is successful and original cache is still there. |
| if (!msg.hasError() && !msg.nop()) { |
| DynamicCacheDescriptor cacheDesc = ctx.cache().cacheDescriptor(msg.operation().cacheName()); |
| |
| if (cacheDesc != null && F.eq(cacheDesc.deploymentId(), proposeMsg.deploymentId())) { |
| cacheDesc.schemaChangeFinish(msg); |
| |
| try { |
| ctx.cache().saveCacheConfiguration(cacheDesc); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Error while saving cache configuration on disk, cfg = " |
| + cacheDesc.cacheConfiguration(), e); |
| } |
| } |
| } |
| |
| // Propose message will be used from exchange thread to |
| msg.proposeMessage(proposeMsg); |
| |
| if (exchangeReady) { |
| SchemaOperation op = schemaOps.get(proposeMsg.schemaName()); |
| |
| if (F.eq(op.id(), opId)) { |
| // Completed top operation. |
| op.finishMessage(msg); |
| |
| if (op.started()) |
| op.doFinish(); |
| } |
| else { |
| // Completed operation in the middle, will schedule completion later. |
| while (op != null) { |
| if (F.eq(op.id(), opId)) |
| break; |
| |
| op = op.next(); |
| } |
| |
| assert op != null; |
| assert !op.started(); |
| |
| op.finishMessage(msg); |
| } |
| } |
| else { |
| // Set next operation as top-level one. |
| String schemaName = proposeMsg.schemaName(); |
| |
| SchemaOperation op = schemaOps.remove(schemaName); |
| |
| assert op != null; |
| assert F.eq(op.id(), opId); |
| |
| // Chain to the next operation (if any). |
| SchemaOperation nextOp = op.next(); |
| |
| if (nextOp != null) |
| schemaOps.put(schemaName, nextOp); |
| } |
| |
| // Clean stale IO messages from just-joined nodes. |
| cleanStaleStatusMessages(opId); |
| } |
| } |
| |
| /** |
| * Initiate actual schema change operation. |
| * |
| * @param schemaOp Schema operation. |
| */ |
| private void startSchemaChange(SchemaOperation schemaOp) { |
| assert Thread.holdsLock(stateMux); |
| assert !schemaOp.started(); |
| |
| // Get current cache state. |
| SchemaProposeDiscoveryMessage msg = schemaOp.proposeMessage(); |
| |
| String cacheName = msg.operation().cacheName(); |
| |
| DynamicCacheDescriptor cacheDesc = ctx.cache().cacheDescriptor(cacheName); |
| |
| boolean cacheExists = cacheDesc != null && F.eq(msg.deploymentId(), cacheDesc.deploymentId()); |
| |
| boolean cacheRegistered = cacheExists && cacheNames.contains(cacheName); |
| |
| // Validate schema state and decide whether we should proceed or not. |
| SchemaAbstractOperation op = msg.operation(); |
| |
| QueryTypeDescriptorImpl type = null; |
| SchemaOperationException err; |
| |
| boolean nop = false; |
| |
| if (cacheExists) { |
| if (cacheRegistered) { |
| // If cache is started, we perform validation against real schema. |
| T3<QueryTypeDescriptorImpl, Boolean, SchemaOperationException> res = prepareChangeOnStartedCache(op); |
| |
| assert res.get2() != null; |
| |
| type = res.get1(); |
| nop = res.get2(); |
| err = res.get3(); |
| } |
| else { |
| T2<Boolean, SchemaOperationException> res = prepareChangeOnNotStartedCache(op, cacheDesc); |
| |
| assert res.get1() != null; |
| |
| type = null; |
| nop = res.get1(); |
| err = res.get2(); |
| } |
| } |
| else |
| err = new SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, cacheName); |
| |
| // Start operation. |
| SchemaOperationWorker worker = |
| new SchemaOperationWorker(ctx, this, msg.deploymentId(), op, nop, err, cacheRegistered, type); |
| |
| SchemaOperationManager mgr = new SchemaOperationManager(ctx, this, worker, |
| ctx.clientNode() ? null : coordinator()); |
| |
| schemaOp.manager(mgr); |
| |
| mgr.start(); |
| |
| // Unwind pending IO messages. |
| if (!ctx.clientNode() && coordinator().isLocal()) |
| unwindPendingMessages(schemaOp.id(), mgr); |
| |
| // Schedule operation finish handling if needed. |
| if (schemaOp.hasFinishMessage()) |
| schemaOp.doFinish(); |
| } |
| |
| /** |
| * @return {@code true} If indexing module is in classpath and successfully initialized. |
| */ |
| public boolean indexingEnabled() { |
| return idx != null; |
| } |
| |
| /** |
| * @return {@code true} If indexing module is enabled or any query engine is enabled. |
| */ |
| public boolean moduleEnabled() { |
| return indexingEnabled() || dfltQryEngine != null; |
| } |
| |
| /** |
| * @return Indexing. |
| * @throws IgniteException If module is not enabled. |
| */ |
| public GridQueryIndexing getIndexing() throws IgniteException { |
| checkxIndexingEnabled(); |
| |
| return idx; |
| } |
| |
| /** |
| * @return Running query manager. |
| * @throws IgniteException If module is not enabled. |
| */ |
| public RunningQueryManager runningQueryManager() throws IgniteException { |
| return runningQryMgr; |
| } |
| |
| /** |
| * Create type descriptors from schema and initialize indexing for given cache.<p> |
| * Use with {@link #busyLock} where appropriate. |
| * @param cacheInfo Cache context info. |
| * @param schema Initial schema. |
| * @param isSql {@code true} in case create cache initialized from SQL. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void onCacheStart0(GridCacheContextInfo<?, ?> cacheInfo, QuerySchema schema, boolean isSql) |
| throws IgniteCheckedException { |
| if (!cacheSupportSql(cacheInfo.config())) { |
| synchronized (stateMux) { |
| boolean proceed = false; |
| |
| for (SchemaAbstractDiscoveryMessage msg: activeProposals.values()) { |
| if (msg.operation() instanceof SchemaAddQueryEntityOperation) { |
| SchemaAddQueryEntityOperation op = (SchemaAddQueryEntityOperation)msg.operation(); |
| |
| if (op.cacheName().equals(cacheInfo.name())) { |
| proceed = true; |
| |
| break; |
| } |
| } |
| } |
| |
| if (!proceed) |
| return; |
| } |
| } |
| |
| ctx.cache().context().database().checkpointReadLock(); |
| |
| try { |
| String cacheName = cacheInfo.name(); |
| String schemaName = QueryUtils.normalizeSchemaName(cacheName, cacheInfo.config().getSqlSchema()); |
| |
| if (cacheInfo.isClientCache() && cacheInfo.isCacheContextInited() |
| && schemaMgr.initCacheContext(cacheInfo.cacheContext())) { |
| if (idx != null) |
| idx.registerCache(cacheName, schemaName, cacheInfo); |
| |
| return; |
| } |
| |
| synchronized (stateMux) { |
| boolean escape = cacheInfo.config().isSqlEscapeAll(); |
| |
| T3<Collection<QueryTypeCandidate>, Map<String, QueryTypeDescriptorImpl>, Map<String, QueryTypeDescriptorImpl>> |
| candRes = createQueryCandidates(cacheName, schemaName, cacheInfo, schema.entities(), escape); |
| |
| // Ensure that candidates has unique index names. |
| // Otherwise we will not be able to apply pending operations. |
| Collection<QueryTypeCandidate> cands = candRes.get1(); |
| Map<String, QueryTypeDescriptorImpl> tblTypMap = candRes.get2(); |
| Map<String, QueryTypeDescriptorImpl> idxTypMap = candRes.get3(); |
| |
| // Apply pending operation which could have been completed as no-op at this point. |
| // There could be only one in-flight operation for a cache. |
| for (SchemaOperation op : schemaOps.values()) { |
| if (F.eq(op.proposeMessage().operation().cacheName(), cacheName) |
| && F.eq(op.proposeMessage().deploymentId(), cacheInfo.dynamicDeploymentId())) { |
| if (op.started()) { |
| SchemaOperationWorker worker = op.manager().worker(); |
| |
| assert !worker.cacheRegistered(); |
| |
| if (!worker.nop()) { |
| IgniteInternalFuture fut = worker.future(); |
| |
| assert fut.isDone(); |
| |
| if (fut.error() == null) { |
| SchemaAbstractOperation op0 = op.proposeMessage().operation(); |
| |
| if (op0 instanceof SchemaIndexCreateOperation) { |
| SchemaIndexCreateOperation opCreate = (SchemaIndexCreateOperation)op0; |
| |
| QueryTypeDescriptorImpl typeDesc = tblTypMap.get(opCreate.tableName()); |
| |
| assert typeDesc != null; |
| |
| QueryUtils.processDynamicIndexChange(opCreate.indexName(), opCreate.index(), |
| typeDesc); |
| } |
| else if (op0 instanceof SchemaIndexDropOperation) { |
| SchemaIndexDropOperation opDrop = (SchemaIndexDropOperation)op0; |
| |
| QueryTypeDescriptorImpl typeDesc = idxTypMap.get(opDrop.indexName()); |
| |
| assert typeDesc != null; |
| |
| QueryUtils.processDynamicIndexChange(opDrop.indexName(), null, typeDesc); |
| } |
| else if (op0 instanceof SchemaAlterTableAddColumnOperation) { |
| SchemaAlterTableAddColumnOperation opAddCol = |
| (SchemaAlterTableAddColumnOperation)op0; |
| |
| QueryTypeDescriptorImpl typeDesc = tblTypMap.get(opAddCol.tableName()); |
| |
| assert typeDesc != null; |
| |
| processDynamicAddColumn(typeDesc, opAddCol.columns()); |
| } |
| else if (op0 instanceof SchemaAlterTableDropColumnOperation) { |
| SchemaAlterTableDropColumnOperation opDropCol = |
| (SchemaAlterTableDropColumnOperation)op0; |
| |
| QueryTypeDescriptorImpl typeDesc = tblTypMap.get(opDropCol.tableName()); |
| |
| assert typeDesc != null; |
| |
| processDynamicDropColumn(typeDesc, opDropCol.columns()); |
| } |
| else if (op0 instanceof SchemaAddQueryEntityOperation) { |
| SchemaAddQueryEntityOperation opEnableIdx = |
| (SchemaAddQueryEntityOperation)op0; |
| |
| cacheInfo.onSchemaAddQueryEntity(opEnableIdx); |
| |
| cands = createQueryCandidates( |
| opEnableIdx.cacheName(), |
| opEnableIdx.schemaName(), |
| cacheInfo, |
| opEnableIdx.entities(), |
| opEnableIdx.isSqlEscape() |
| ).get1(); |
| |
| schemaName = opEnableIdx.schemaName(); |
| } |
| else |
| assert false : "Unsupported operation: " + op0; |
| } |
| } |
| } |
| |
| break; |
| } |
| } |
| |
| // Ready to register at this point. |
| registerCache0(cacheName, schemaName, cacheInfo, cands, isSql); |
| } |
| } |
| finally { |
| ctx.cache().context().database().checkpointReadUnlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { |
| Collection<SchemaOperationClientFuture> futs; |
| |
| synchronized (stateMux) { |
| disconnected = true; |
| exchangeReady = false; |
| |
| // Clear client futures. |
| futs = new ArrayList<>(schemaCliFuts.values()); |
| |
| schemaCliFuts.clear(); |
| |
| // Clear operations data. |
| activeProposals.clear(); |
| schemaOps.clear(); |
| } |
| |
| // Complete client futures outside of synchronized block because they may have listeners/chains. |
| for (SchemaOperationClientFuture fut : futs) |
| fut.onDone(new SchemaOperationException("Client node is disconnected (operation result is unknown).")); |
| |
| if (idx != null) |
| idx.onDisconnected(reconnectFut); |
| |
| runningQryMgr.onDisconnected(); |
| } |
| |
| /** |
| * Initialize query infrastructure for not started cache. |
| * |
| * @param cacheDesc Cache descriptor. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void initQueryStructuresForNotStartedCache(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException { |
| QuerySchema schema = cacheDesc.schema() != null ? cacheDesc.schema() : new QuerySchema(); |
| |
| GridCacheContextInfo cacheInfo = new GridCacheContextInfo(cacheDesc); |
| |
| onCacheStart(cacheInfo, schema, cacheDesc.sql()); |
| } |
| |
| /** |
| * Handle cache start. Invoked either from GridCacheProcessor.onKernalStart() method or from exchange worker. |
| * When called for the first time, we initialize topology thus understanding whether current node is coordinator |
| * or not. |
| * |
| * @param cacheInfo Cache context info. |
| * @param schema Index states. |
| * @param isSql {@code true} in case create cache initialized from SQL. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void onCacheStart( |
| GridCacheContextInfo cacheInfo, |
| QuerySchema schema, |
| boolean isSql |
| ) throws IgniteCheckedException { |
| if (!moduleEnabled()) |
| return; |
| |
| if (!busyLock.enterBusy()) |
| return; |
| |
| try { |
| onCacheStart0(cacheInfo, schema, isSql); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Destroy H2 structures for not started caches. |
| * |
| * @param cacheName Cache name. |
| */ |
| public void onCacheStop(String cacheName) { |
| if (!moduleEnabled()) |
| return; |
| |
| GridCacheContextInfo cacheInfo = schemaMgr.cacheInfo(cacheName); |
| |
| if (cacheInfo != null) |
| onCacheStop(cacheInfo, true, true); |
| } |
| |
| /** |
| * @param cacheInfo Cache context info. |
| * @param removeIdx If {@code true}, will remove index. |
| * @param clearIdx If {@code true}, will clear the index. |
| */ |
| public void onCacheStop(GridCacheContextInfo cacheInfo, boolean removeIdx, boolean clearIdx) { |
| if (!moduleEnabled()) |
| return; |
| |
| if (!busyLock.enterBusy()) |
| return; |
| |
| try { |
| onCacheStop0(cacheInfo, removeIdx, clearIdx); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @param cacheInfo Cache context info. |
| */ |
| public void onClientCacheStop(GridCacheContextInfo cacheInfo) { |
| if (!moduleEnabled()) |
| return; |
| |
| if (!busyLock.enterBusy()) |
| return; |
| |
| try { |
| if (schemaMgr.clearCacheContext(cacheInfo.cacheContext())) { |
| if (idx != null) |
| idx.unregisterCache(cacheInfo); |
| } |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @return Skip field lookup flag. |
| */ |
| public boolean skipFieldLookup() { |
| return skipFieldLookup; |
| } |
| |
| /** |
| * @param skipFieldLookup Skip field lookup flag. |
| */ |
| public void skipFieldLookup(boolean skipFieldLookup) { |
| this.skipFieldLookup = skipFieldLookup; |
| } |
| |
| /** |
| * Register metadata locally for already registered caches. |
| * |
| * @param platformOnly Whether to register non-Java platformOnly types only. |
| */ |
| public void registerMetadataForRegisteredCaches(boolean platformOnly) { |
| for (DynamicCacheDescriptor cacheDescriptor : ctx.cache().cacheDescriptors().values()) |
| registerBinaryMetadata(cacheDescriptor.cacheConfiguration(), cacheDescriptor.schema(), platformOnly); |
| } |
| |
| /** |
| * Handle of cache change request. |
| * |
| * @param batch Dynamic cache change batch request. |
| */ |
| public void onCacheChangeRequested(DynamicCacheChangeBatch batch) { |
| for (DynamicCacheChangeRequest req : batch.requests()) { |
| if (!req.start()) |
| continue; |
| |
| try { |
| registerBinaryMetadata(req.startCacheConfiguration(), req.schema(), false); |
| } |
| catch (BinaryObjectException e) { |
| ctx.cache().completeCacheStartFuture(req, false, e); |
| } |
| } |
| } |
| |
| /** |
| * Register binary metadata locally. |
| * |
| * @param ccfg Cache configuration. |
| * @param schema Schema for which register metadata is required. |
| * @param platformOnly Whether to register non-Java platformOnly types only. |
| * @throws BinaryObjectException if register was failed. |
| */ |
| private void registerBinaryMetadata(CacheConfiguration ccfg, QuerySchema schema, boolean platformOnly) throws BinaryObjectException { |
| if (schema != null) { |
| Collection<QueryEntity> qryEntities = schema.entities(); |
| |
| if (!F.isEmpty(qryEntities)) { |
| boolean binaryEnabled = ctx.cacheObjects().isBinaryEnabled(ccfg); |
| |
| if (binaryEnabled) { |
| for (QueryEntity qryEntity : qryEntities) { |
| registerTypeLocally(qryEntity.findKeyType(), platformOnly); |
| registerTypeLocally(qryEntity.findValueType(), platformOnly); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Create query candidates and mappings of index and type descriptors names to them. |
| * |
| * @param cacheName Cache name. |
| * @param schemaName Schema name. |
| * @param cacheInfo Grid cache info. |
| * @param entities Collection of query entities. |
| * @param escape Sql escale flag. |
| * @return Triple of query candidates and mappings of index and type descriptors names to them. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private T3<Collection<QueryTypeCandidate>, Map<String, QueryTypeDescriptorImpl>, Map<String, QueryTypeDescriptorImpl>> |
| createQueryCandidates( |
| String cacheName, |
| String schemaName, |
| GridCacheContextInfo<?, ?> cacheInfo, |
| Collection<QueryEntity> entities, |
| boolean escape |
| ) throws IgniteCheckedException { |
| Collection<QueryTypeCandidate> cands = new ArrayList<>(); |
| |
| List<Class<?>> mustDeserializeClss = new ArrayList<>(); |
| |
| if (!F.isEmpty(entities)) { |
| for (QueryEntity qryEntity : entities) { |
| QueryTypeCandidate cand = QueryUtils.typeForQueryEntity( |
| ctx, |
| cacheName, |
| schemaName, |
| cacheInfo, |
| qryEntity, |
| mustDeserializeClss, |
| escape |
| ); |
| |
| cands.add(cand); |
| } |
| } |
| |
| // Ensure that candidates has unique index names. |
| // Otherwise we will not be able to apply pending operations. |
| Map<String, QueryTypeDescriptorImpl> tblTypMap = new HashMap<>(); |
| Map<String, QueryTypeDescriptorImpl> idxTypMap = new HashMap<>(); |
| |
| for (QueryTypeCandidate cand : cands) { |
| QueryTypeDescriptorImpl desc = cand.descriptor(); |
| |
| QueryTypeDescriptorImpl oldDesc = tblTypMap.put(desc.tableName(), desc); |
| |
| if (oldDesc != null) |
| throw new IgniteException("Duplicate table name [cache=" + cacheName + |
| ",tblName=" + desc.tableName() + |
| ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']'); |
| |
| for (String idxName : desc.indexes().keySet()) { |
| oldDesc = idxTypMap.put(idxName, desc); |
| |
| if (oldDesc != null) |
| throw new IgniteException("Duplicate index name [cache=" + cacheName + |
| ",idxName=" + idxName + |
| ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']'); |
| } |
| } |
| |
| // Warn about possible implicit deserialization. |
| if (!mustDeserializeClss.isEmpty()) { |
| U.warnDevOnly(log, "Some classes in query configuration cannot be written in binary format " + |
| "because they either implement Externalizable interface or have writeObject/readObject " + |
| "methods. Instances of these classes will be deserialized in order to build indexes. Please " + |
| "ensure that all nodes have these classes in classpath. To enable binary serialization " + |
| "either implement " + Binarylizable.class.getSimpleName() + " interface or set explicit " + |
| "serializer using BinaryTypeConfiguration.setSerializer() method: " + mustDeserializeClss); |
| } |
| |
| return new T3<>(cands, tblTypMap, idxTypMap); |
| } |
| |
| /** |
| * Register class metadata locally if it didn't do it earlier. |
| * |
| * @param clsName Class name for which the metadata should be registered. |
| * @param platformOnly Whether to only register non-Java platform types only. |
| * @throws BinaryObjectException if register was failed. |
| */ |
| private void registerTypeLocally(String clsName, boolean platformOnly) throws BinaryObjectException { |
| if (clsName == null) |
| return; |
| |
| IgniteCacheObjectProcessor cacheObjProc = ctx.cacheObjects(); |
| |
| if (cacheObjProc instanceof CacheObjectBinaryProcessorImpl) { |
| CacheObjectBinaryProcessorImpl binProc = (CacheObjectBinaryProcessorImpl)cacheObjProc; |
| |
| Class<?> cls = U.box(U.classForName(clsName, null, true)); |
| |
| if (cls != null) { |
| if (!platformOnly) |
| binProc.binaryContext().registerClass(cls, true, false, true); |
| } |
| else |
| registerPlatformTypeLocally(clsName, binProc); |
| } |
| } |
| |
| /** |
| * Registers platform type locally. |
| * |
| * @param clsName Class name. |
| * @param binProc Binary processor. |
| */ |
| private void registerPlatformTypeLocally(String clsName, CacheObjectBinaryProcessorImpl binProc) { |
| PlatformProcessor platformProc = ctx.platform(); |
| |
| if (platformProc == null || !platformProc.hasContext()) |
| return; |
| |
| PlatformContext platformCtx = platformProc.context(); |
| BinaryMetadata meta = platformCtx.getBinaryType(clsName); |
| |
| if (meta != null) |
| binProc.binaryContext().registerClassLocally( |
| meta.wrap(binProc.binaryContext()), |
| false, |
| platformCtx.getMarshallerPlatformId()); |
| } |
| |
| /** |
| * Handle custom discovery message. |
| * |
| * @param msg Message. |
| */ |
| public void onDiscovery(SchemaAbstractDiscoveryMessage msg) { |
| IgniteUuid id = msg.id(); |
| |
| if (!dscoMsgIdHist.add(id)) { |
| U.warn(log, "Received duplicate schema custom discovery message (will ignore) [opId=" + |
| msg.operation().id() + ", msg=" + msg + ']'); |
| |
| return; |
| } |
| |
| if (msg instanceof SchemaProposeDiscoveryMessage) { |
| SchemaProposeDiscoveryMessage msg0 = (SchemaProposeDiscoveryMessage)msg; |
| |
| boolean exchange = onSchemaProposeDiscovery(msg0); |
| |
| msg0.exchange(exchange); |
| } |
| else if (msg instanceof SchemaFinishDiscoveryMessage) { |
| SchemaFinishDiscoveryMessage msg0 = (SchemaFinishDiscoveryMessage)msg; |
| |
| onSchemaFinishDiscovery(msg0); |
| } |
| else |
| U.warn(log, "Received unsupported schema custom discovery message (will ignore) [opId=" + |
| msg.operation().id() + ", msg=" + msg + ']'); |
| } |
| |
| /** |
| * Prepare change on started cache. |
| * |
| * @param op Operation. |
| * @return Result: affected type, nop flag, error. |
| */ |
| private T3<QueryTypeDescriptorImpl, Boolean, SchemaOperationException> prepareChangeOnStartedCache( |
| SchemaAbstractOperation op) { |
| QueryTypeDescriptorImpl type = null; |
| boolean nop = false; |
| SchemaOperationException err = null; |
| |
| String cacheName = op.cacheName(); |
| |
| if (op instanceof SchemaIndexCreateOperation) { |
| SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op; |
| |
| QueryIndex idx = op0.index(); |
| |
| // Make sure table exists. |
| String tblName = op0.tableName(); |
| |
| type = type(cacheName, tblName); |
| |
| if (type == null) |
| err = new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, tblName); |
| else { |
| // Make sure that index can be applied to the given table. |
| for (String idxField : idx.getFieldNames()) { |
| if (!type.fields().containsKey(idxField)) { |
| err = new SchemaOperationException(SchemaOperationException.CODE_COLUMN_NOT_FOUND, |
| idxField); |
| |
| break; |
| } |
| } |
| } |
| |
| // Check conflict with other indexes. |
| if (err == null) { |
| String idxName = op0.index().getName(); |
| |
| QueryIndexKey idxKey = new QueryIndexKey(op.schemaName(), idxName); |
| |
| if (idxs.get(idxKey) != null) { |
| if (op0.ifNotExists()) |
| nop = true; |
| else |
| err = new SchemaOperationException(SchemaOperationException.CODE_INDEX_EXISTS, idxName); |
| } |
| } |
| } |
| else if (op instanceof SchemaIndexDropOperation) { |
| SchemaIndexDropOperation op0 = (SchemaIndexDropOperation)op; |
| |
| String idxName = op0.indexName(); |
| |
| QueryIndexDescriptorImpl oldIdx = idxs.get(new QueryIndexKey(op.schemaName(), idxName)); |
| |
| if (oldIdx == null) { |
| if (op0.ifExists()) |
| nop = true; |
| else |
| err = new SchemaOperationException(SchemaOperationException.CODE_INDEX_NOT_FOUND, idxName); |
| } |
| else |
| type = oldIdx.typeDescriptor(); |
| } |
| else if (op instanceof SchemaAlterTableAddColumnOperation) { |
| SchemaAlterTableAddColumnOperation op0 = (SchemaAlterTableAddColumnOperation)op; |
| |
| type = type(cacheName, op0.tableName()); |
| |
| if (type == null) { |
| if (op0.ifTableExists()) |
| nop = true; |
| else |
| err = new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, |
| op0.tableName()); |
| } |
| else { |
| for (QueryField col : op0.columns()) { |
| if (type.hasField(col.name())) { |
| if (op0.ifNotExists()) { |
| assert op0.columns().size() == 1; |
| |
| nop = true; |
| } |
| else |
| err = new SchemaOperationException(CODE_COLUMN_EXISTS, col.name()); |
| } |
| else if (!checkFieldOnBinaryType(type.typeId(), col)) |
| err = new SchemaOperationException(CODE_COLUMN_EXISTS, "with a different type."); |
| } |
| } |
| } |
| else if (op instanceof SchemaAlterTableDropColumnOperation) { |
| SchemaAlterTableDropColumnOperation op0 = (SchemaAlterTableDropColumnOperation)op; |
| |
| type = type(cacheName, op0.tableName()); |
| |
| if (type == null) { |
| if (op0.ifTableExists()) |
| nop = true; |
| else |
| err = new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, |
| op0.tableName()); |
| } |
| else { |
| for (String name : op0.columns()) { |
| if (err != null) |
| break; |
| |
| if (!type.hasField(name)) { |
| if (op0.ifExists()) { |
| assert op0.columns().size() == 1; |
| |
| nop = true; |
| } |
| else |
| err = new SchemaOperationException(SchemaOperationException.CODE_COLUMN_NOT_FOUND, name); |
| |
| break; |
| } |
| |
| err = QueryUtils.validateDropColumn(type, name); |
| } |
| } |
| } |
| else if (op instanceof SchemaAddQueryEntityOperation) { |
| if (cacheNames.contains(op.cacheName())) |
| err = new SchemaOperationException(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED, op.cacheName()); |
| } |
| else |
| err = new SchemaOperationException("Unsupported operation: " + op); |
| |
| return new T3<>(type, nop, err); |
| } |
| |
| /** |
| * It is checked that if a new column already existed in BinaryType, |
| * then its type does not change. |
| * |
| * @param typeId Binary type id. |
| * @param qryField New query field. |
| * @return {@code True} if the field is not added or type does not change. |
| */ |
| private boolean checkFieldOnBinaryType(int typeId, QueryField qryField) { |
| assert nonNull(qryField); |
| |
| try { |
| BinaryType binaryType = ctx.cacheObjects().metadata(typeId); |
| String binaryFieldType = nonNull(binaryType) ? binaryType.fieldTypeName(qryField.name()) : null; |
| |
| return isNull(binaryFieldType) || |
| binaryFieldType.equals(fieldTypeName(typeByClass(Class.forName(qryField.typeName())))); |
| } |
| catch (ClassNotFoundException e) { |
| throw new IgniteException( |
| "Class not found for property [name=" + qryField.name() + ", type=" + qryField.typeName() + ']' |
| ); |
| } |
| } |
| |
| /** |
| * Prepare operation on non-started cache. |
| * |
| * @param op Operation. |
| * @param desc Dynamic cache descriptor. |
| * @return Result: nop flag, error. |
| */ |
| private T2<Boolean, SchemaOperationException> prepareChangeOnNotStartedCache( |
| SchemaAbstractOperation op, |
| DynamicCacheDescriptor desc |
| ) { |
| boolean nop = false; |
| SchemaOperationException err = null; |
| |
| if (op instanceof SchemaAddQueryEntityOperation) { |
| if (cacheSupportSql(desc.cacheConfiguration())) |
| err = new SchemaOperationException(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED, desc.cacheName()); |
| |
| return new T2<>(nop, err); |
| } |
| |
| // Build table and index maps. |
| QuerySchema schema = desc.schema(); |
| Map<String, QueryEntity> tblMap = new HashMap<>(); |
| Map<String, T2<QueryEntity, QueryIndex>> idxMap = new HashMap<>(); |
| |
| for (QueryEntity entity : schema.entities()) { |
| String tblName = entity.getTableName(); |
| |
| QueryEntity oldEntity = tblMap.put(tblName, entity); |
| |
| if (oldEntity != null) { |
| err = new SchemaOperationException("Invalid schema state (duplicate table found): " + tblName); |
| |
| break; |
| } |
| |
| for (QueryIndex entityIdx : entity.getIndexes()) { |
| String idxName = entityIdx.getName(); |
| |
| T2<QueryEntity, QueryIndex> oldIdxEntity = idxMap.put(idxName, new T2<>(entity, entityIdx)); |
| |
| if (oldIdxEntity != null) { |
| err = new SchemaOperationException("Invalid schema state (duplicate index found): " + |
| idxName); |
| |
| break; |
| } |
| } |
| |
| if (err != null) |
| break; |
| } |
| |
| // Now check whether operation can be applied to schema. |
| if (op instanceof SchemaIndexCreateOperation) { |
| SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op; |
| |
| String idxName = op0.indexName(); |
| |
| T2<QueryEntity, QueryIndex> oldIdxEntity = idxMap.get(idxName); |
| |
| if (oldIdxEntity == null) { |
| String tblName = op0.tableName(); |
| |
| QueryEntity oldEntity = tblMap.get(tblName); |
| |
| if (oldEntity == null) |
| err = new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, tblName); |
| else { |
| for (String fieldName : op0.index().getFields().keySet()) { |
| Set<String> oldEntityFields = new HashSet<>(oldEntity.getFields().keySet()); |
| |
| for (Map.Entry<String, String> alias : oldEntity.getAliases().entrySet()) { |
| oldEntityFields.remove(alias.getKey()); |
| oldEntityFields.add(alias.getValue()); |
| } |
| |
| if (!oldEntityFields.contains(fieldName)) { |
| err = new SchemaOperationException(SchemaOperationException.CODE_COLUMN_NOT_FOUND, |
| fieldName); |
| |
| break; |
| } |
| } |
| } |
| } |
| else { |
| if (op0.ifNotExists()) |
| nop = true; |
| else |
| err = new SchemaOperationException(SchemaOperationException.CODE_INDEX_EXISTS, idxName); |
| } |
| } |
| else if (op instanceof SchemaIndexDropOperation) { |
| SchemaIndexDropOperation op0 = (SchemaIndexDropOperation)op; |
| |
| String idxName = op0.indexName(); |
| |
| T2<QueryEntity, QueryIndex> oldIdxEntity = idxMap.get(idxName); |
| |
| if (oldIdxEntity == null) { |
| if (op0.ifExists()) |
| nop = true; |
| else |
| err = new SchemaOperationException(SchemaOperationException.CODE_INDEX_NOT_FOUND, idxName); |
| } |
| } |
| else if (op instanceof SchemaAlterTableAddColumnOperation) { |
| SchemaAlterTableAddColumnOperation op0 = (SchemaAlterTableAddColumnOperation)op; |
| |
| QueryEntity e = tblMap.get(op0.tableName()); |
| |
| if (e == null) { |
| if (op0.ifTableExists()) |
| nop = true; |
| else |
| err = new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, |
| op0.tableName()); |
| } |
| else { |
| for (QueryField fld : op0.columns()) { |
| if (e.getFields().containsKey(fld.name())) { |
| if (op0.ifNotExists()) { |
| assert op0.columns().size() == 1; |
| |
| nop = true; |
| } |
| else |
| err = new SchemaOperationException(CODE_COLUMN_EXISTS, fld.name()); |
| } |
| } |
| } |
| } |
| else if (op instanceof SchemaAlterTableDropColumnOperation) { |
| SchemaAlterTableDropColumnOperation op0 = (SchemaAlterTableDropColumnOperation)op; |
| |
| QueryEntity e = tblMap.get(op0.tableName()); |
| |
| if (e == null) { |
| if (op0.ifTableExists()) |
| nop = true; |
| else |
| err = new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, |
| op0.tableName()); |
| } |
| else { |
| for (String colName : op0.columns()) { |
| if (err != null) |
| break; |
| |
| String fldName = QueryUtils.fieldNameByAlias(e, colName); |
| |
| if (!e.getFields().containsKey(fldName)) { |
| if (op0.ifExists()) { |
| assert op0.columns().size() == 1; |
| |
| nop = true; |
| } |
| else |
| err = new SchemaOperationException(SchemaOperationException.CODE_COLUMN_NOT_FOUND, fldName); |
| |
| break; |
| } |
| |
| err = QueryUtils.validateDropColumn(e, fldName, colName); |
| } |
| } |
| } |
| else |
| err = new SchemaOperationException("Unsupported operation: " + op); |
| |
| return new T2<>(nop, err); |
| } |
| |
| /** |
| * Invoked when coordinator finished ensuring that all participants are ready. |
| * |
| * @param op Operation. |
| * @param err Error (if any). |
| */ |
| public void onCoordinatorFinished(SchemaAbstractOperation op, @Nullable SchemaOperationException err, boolean nop) { |
| synchronized (stateMux) { |
| SchemaFinishDiscoveryMessage msg = new SchemaFinishDiscoveryMessage(op, err, nop); |
| |
| try { |
| ctx.discovery().sendCustomEvent(msg); |
| } |
| catch (Exception e) { |
| // Failed to send finish message over discovery. This is something unrecoverable. |
| U.warn(log, "Failed to send schema finish discovery message [opId=" + op.id() + ']', e); |
| } |
| } |
| } |
| |
| /** |
| * Get current coordinator node. |
| * |
| * @return Coordinator node. |
| */ |
| private ClusterNode coordinator() { |
| assert !ctx.clientNode(); |
| |
| synchronized (stateMux) { |
| if (crd == null) { |
| ClusterNode crd0 = null; |
| |
| for (ClusterNode node : ctx.discovery().aliveServerNodes()) { |
| if (crd0 == null || crd0.order() > node.order()) |
| crd0 = node; |
| } |
| |
| assert crd0 != null; |
| |
| crd = crd0; |
| } |
| |
| return crd; |
| } |
| } |
| |
| /** |
| * Get rid of stale IO message received from other nodes which joined when operation had been in progress. |
| * |
| * @param opId Operation ID. |
| */ |
| private void cleanStaleStatusMessages(UUID opId) { |
| Iterator<SchemaOperationStatusMessage> it = pendingMsgs.iterator(); |
| |
| while (it.hasNext()) { |
| SchemaOperationStatusMessage statusMsg = it.next(); |
| |
| if (F.eq(opId, statusMsg.operationId())) { |
| it.remove(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Dropped operation status message because it is already completed [opId=" + opId + |
| ", rmtNode=" + statusMsg.senderNodeId() + ']'); |
| } |
| } |
| } |
| |
| /** |
| * Apply positive index operation result. |
| * |
| * @param op Operation. |
| * @param type Type descriptor (if available), |
| */ |
| public void onLocalOperationFinished(SchemaAbstractOperation op, @Nullable QueryTypeDescriptorImpl type) { |
| synchronized (stateMux) { |
| if (disconnected) |
| return; |
| |
| // No need to apply anything to obsolete type. |
| if (type == null || type.obsolete()) { |
| if (log.isDebugEnabled()) |
| log.debug("Local operation finished, but type descriptor is either missing or obsolete " + |
| "(will ignore) [opId=" + op.id() + ']'); |
| |
| return; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Local operation finished successfully [opId=" + op.id() + ']'); |
| |
| String schemaName = op.schemaName(); |
| |
| try { |
| if (op instanceof SchemaIndexCreateOperation) { |
| SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op; |
| |
| QueryUtils.processDynamicIndexChange(op0.indexName(), op0.index(), type); |
| |
| QueryIndexDescriptorImpl idxDesc = type.index(op0.indexName()); |
| |
| QueryIndexKey idxKey = new QueryIndexKey(schemaName, op0.indexName()); |
| |
| idxs.put(idxKey, idxDesc); |
| } |
| else if (op instanceof SchemaIndexDropOperation) { |
| SchemaIndexDropOperation op0 = (SchemaIndexDropOperation)op; |
| |
| QueryUtils.processDynamicIndexChange(op0.indexName(), null, type); |
| |
| QueryIndexKey idxKey = new QueryIndexKey(schemaName, op0.indexName()); |
| |
| idxs.remove(idxKey); |
| } |
| else { |
| assert (op instanceof SchemaAddQueryEntityOperation || op instanceof SchemaAlterTableAddColumnOperation || |
| op instanceof SchemaAlterTableDropColumnOperation); |
| |
| // No-op - all processing is done at "local" stage |
| // as we must update both table and type descriptor atomically. |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.warn(log, "Failed to finish index operation [opId=" + op.id() + " op=" + op + ']', e); |
| } |
| } |
| } |
| |
| /** |
| * Handle node leave. |
| * |
| * @param node Node. |
| */ |
| public void onNodeLeave(ClusterNode node) { |
| synchronized (stateMux) { |
| // Clients do not send status messages and are never coordinators. |
| if (ctx.clientNode()) |
| return; |
| |
| ClusterNode crd0 = coordinator(); |
| |
| if (F.eq(node.id(), crd0.id())) { |
| crd = null; |
| |
| crd0 = coordinator(); |
| } |
| |
| for (SchemaOperation op : schemaOps.values()) { |
| if (op.started()) { |
| op.manager().onNodeLeave(node.id(), crd0); |
| |
| if (crd0.isLocal()) |
| unwindPendingMessages(op.id(), op.manager()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Process schema operation. |
| * |
| * @param op Operation. |
| * @param type Type descriptor. |
| * @param depId Cache deployment ID. |
| * @param cancelTok Cancel token. |
| * @throws SchemaOperationException If failed. |
| */ |
| public void processSchemaOperationLocal(SchemaAbstractOperation op, QueryTypeDescriptorImpl type, IgniteUuid depId, |
| IndexRebuildCancelToken cancelTok) throws SchemaOperationException { |
| if (log.isDebugEnabled()) |
| log.debug("Started local index operation [opId=" + op.id() + ']'); |
| |
| String cacheName = op.cacheName(); |
| |
| GridCacheContextInfo<?, ?> cacheInfo = null; |
| |
| if (op instanceof SchemaAddQueryEntityOperation) { |
| GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(CU.cacheId(cacheName)); |
| |
| if (cctx != null) |
| cacheInfo = new GridCacheContextInfo<>(cctx, false); |
| else |
| return; |
| |
| } |
| else |
| cacheInfo = schemaMgr.cacheInfo(cacheName); |
| |
| if (cacheInfo == null || !F.eq(depId, cacheInfo.dynamicDeploymentId())) |
| throw new SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, cacheName); |
| |
| try { |
| if (op instanceof SchemaIndexCreateOperation) { |
| SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op; |
| |
| QueryIndexDescriptorImpl idxDesc = QueryUtils.createIndexDescriptor(type, op0.index()); |
| |
| SchemaIndexCacheVisitor visitor; |
| |
| if (cacheInfo.isCacheContextInited()) { |
| int buildIdxPoolSize = ctx.config().getBuildIndexThreadPoolSize(); |
| int parallel = op0.parallel(); |
| |
| if (parallel > buildIdxPoolSize) { |
| String idxName = op0.indexName(); |
| |
| log.warning("Provided parallelism " + parallel + " for creation of index " + idxName + |
| " is greater than the number of index building threads. Will use " + buildIdxPoolSize + |
| " threads to build index. Increase by IgniteConfiguration.setBuildIndexThreadPoolSize" + |
| " and restart the node if you want to use more threads. [tableName=" + op0.tableName() + |
| ", indexName=" + idxName + ", requestedParallelism=" + parallel + ", buildIndexPoolSize=" + |
| buildIdxPoolSize + "]"); |
| } |
| |
| GridFutureAdapter<Void> createIdxFut = new GridFutureAdapter<>(); |
| |
| GridCacheContext<?, ?> cacheCtx = cacheInfo.cacheContext(); |
| |
| visitor = new SchemaIndexCacheVisitorImpl( |
| cacheCtx, |
| cancelTok, |
| createIdxFut |
| ) { |
| /** {@inheritDoc} */ |
| @Override public void visit(SchemaIndexCacheVisitorClosure clo) { |
| idxBuildStatusStorage.onStartBuildNewIndex(cacheCtx); |
| |
| try { |
| super.visit(clo); |
| |
| buildIdxFut.get(); |
| } |
| catch (Exception e) { |
| throw new IgniteException(e); |
| } |
| finally { |
| idxBuildStatusStorage.onFinishBuildNewIndex(cacheName); |
| } |
| } |
| }; |
| } |
| else |
| //For not started caches we shouldn't add any data to index. |
| visitor = clo -> {}; |
| |
| schemaMgr.createIndex(op0.schemaName(), op0.tableName(), idxDesc, op0.ifNotExists(), visitor); |
| } |
| else if (op instanceof SchemaIndexDropOperation) { |
| SchemaIndexDropOperation op0 = (SchemaIndexDropOperation)op; |
| |
| schemaMgr.dropIndex(op0.schemaName(), op0.indexName(), op0.ifExists()); |
| } |
| else if (op instanceof SchemaAlterTableAddColumnOperation) { |
| SchemaAlterTableAddColumnOperation op0 = (SchemaAlterTableAddColumnOperation)op; |
| |
| processDynamicAddColumn(type, op0.columns()); |
| |
| schemaMgr.addColumn(op0.schemaName(), op0.tableName(), op0.columns(), op0.ifTableExists(), op0.ifNotExists()); |
| } |
| else if (op instanceof SchemaAlterTableDropColumnOperation) { |
| SchemaAlterTableDropColumnOperation op0 = (SchemaAlterTableDropColumnOperation)op; |
| |
| processDynamicDropColumn(type, op0.columns()); |
| |
| schemaMgr.dropColumn(op0.schemaName(), op0.tableName(), op0.columns(), op0.ifTableExists(), op0.ifExists()); |
| } |
| else if (op instanceof SchemaAddQueryEntityOperation) { |
| SchemaAddQueryEntityOperation op0 = (SchemaAddQueryEntityOperation)op; |
| |
| if (!cacheNames.contains(op0.cacheName())) { |
| cacheInfo.onSchemaAddQueryEntity(op0); |
| |
| T3<Collection<QueryTypeCandidate>, Map<String, QueryTypeDescriptorImpl>, Map<String, QueryTypeDescriptorImpl>> |
| candRes = createQueryCandidates(op0.cacheName(), op0.schemaName(), cacheInfo, op0.entities(), |
| op0.isSqlEscape()); |
| |
| registerCache0(op0.cacheName(), op.schemaName(), cacheInfo, candRes.get1(), false); |
| } |
| |
| if (idxRebuildFutStorage.prepareRebuildIndexes(singleton(cacheInfo.cacheId()), null).isEmpty()) |
| rebuildIndexesFromHash0(cacheInfo.cacheContext(), false, cancelTok); |
| else { |
| if (log.isInfoEnabled()) |
| log.info("Rebuilding indexes for the cache is already in progress: " + cacheInfo.name()); |
| } |
| } |
| else |
| throw new SchemaOperationException("Unsupported operation: " + op); |
| } |
| catch (Throwable e) { |
| if (e instanceof SchemaOperationException) |
| throw (SchemaOperationException)e; |
| else |
| throw new SchemaOperationException("Schema change operation failed: " + e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * Create cache and table from given query entity. |
| * |
| * @param schemaName Schema name to create table in. Case sensitive, must not be \"quoted\". |
| * @param entity Entity to create table from. |
| * @param templateName Template name. |
| * @param cacheName Cache name. |
| * @param cacheGroup Cache group name. |
| * @param dataRegion Data region name. |
| * @param affinityKey Affinity key column name. |
| * @param atomicityMode Atomicity mode. |
| * @param writeSyncMode Write synchronization mode. |
| * @param backups Backups. |
| * @param ifNotExists Quietly ignore this command if table already exists. |
| * @param encrypted Encrypted flag. |
| * @param qryParallelism query parallelism value for configuration of underlying cache. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void dynamicTableCreate( |
| String schemaName, |
| QueryEntity entity, |
| String templateName, |
| String cacheName, |
| String cacheGroup, |
| @Nullable String dataRegion, |
| String affinityKey, |
| @Nullable CacheAtomicityMode atomicityMode, |
| @Nullable CacheWriteSynchronizationMode writeSyncMode, |
| @Nullable Integer backups, |
| boolean ifNotExists, |
| @Nullable Boolean encrypted, |
| @Nullable Integer qryParallelism |
| ) throws IgniteCheckedException { |
| assert !F.isEmpty(templateName); |
| assert backups == null || backups >= 0; |
| assert qryParallelism == null || qryParallelism > 0; |
| |
| CacheConfiguration<?, ?> ccfg = ctx.cache().getConfigFromTemplate(templateName); |
| |
| if (ccfg == null) { |
| if (QueryUtils.TEMPLATE_PARTITIONED.equalsIgnoreCase(templateName)) |
| ccfg = new CacheConfiguration<>().setCacheMode(CacheMode.PARTITIONED); |
| else if (QueryUtils.TEMPLATE_REPLICATED.equalsIgnoreCase(templateName)) |
| ccfg = new CacheConfiguration<>().setCacheMode(CacheMode.REPLICATED); |
| else |
| throw new SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, templateName); |
| |
| ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); |
| } |
| |
| if (!F.isEmpty(ccfg.getQueryEntities())) |
| throw new SchemaOperationException("Template cache already contains query entities which it should not: " + |
| templateName); |
| |
| if (!F.isEmpty(entity.getNotNullFields())) |
| QueryUtils.checkNotNullAllowed(ccfg); |
| |
| if (F.isEmpty(cacheName)) |
| cacheName = QueryUtils.createTableCacheName(schemaName, entity.getTableName()); |
| |
| ccfg.setName(cacheName); |
| |
| if (!F.isEmpty(cacheGroup)) |
| ccfg.setGroupName(cacheGroup); |
| |
| if (!F.isEmpty(dataRegion)) |
| ccfg.setDataRegionName(dataRegion); |
| |
| if (atomicityMode != null) |
| ccfg.setAtomicityMode(atomicityMode); |
| |
| if (writeSyncMode != null) |
| ccfg.setWriteSynchronizationMode(writeSyncMode); |
| |
| if (backups != null) |
| ccfg.setBackups(backups); |
| |
| if (qryParallelism != null) |
| ccfg.setQueryParallelism(qryParallelism); |
| |
| if (encrypted != null) |
| ccfg.setEncryptionEnabled(encrypted); |
| |
| ccfg.setSqlSchema("\"" + schemaName + "\""); |
| ccfg.setSqlEscapeAll(true); |
| ccfg.setQueryEntities(singleton(entity)); |
| |
| if (!QueryUtils.isCustomAffinityMapper(ccfg.getAffinityMapper())) |
| ccfg.setAffinityMapper(null); |
| |
| if (affinityKey != null) |
| ccfg.setKeyConfiguration(new CacheKeyConfiguration(entity.getKeyType(), affinityKey)); |
| |
| boolean res; |
| |
| try { |
| res = ctx.grid().getOrCreateCache0(ccfg, true).get2(); |
| } |
| catch (CacheException e) { |
| if (e.getCause() instanceof SchemaOperationException) |
| throw (SchemaOperationException)e.getCause(); |
| else |
| throw e; |
| } |
| |
| if (!res && !ifNotExists) |
| throw new SchemaOperationException(SchemaOperationException.CODE_TABLE_EXISTS, entity.getTableName()); |
| } |
| |
| /** |
| * Drop table by destroying its cache if it's an 1:1 per cache table. |
| * |
| * @param cacheName Cache name. |
| * @param tblName Table name. |
| * @param ifExists Quietly ignore this command if table does not exist. |
| * @throws SchemaOperationException if {@code ifExists} is {@code false} and cache was not found. |
| */ |
| public void dynamicTableDrop(String cacheName, String tblName, boolean ifExists) throws SchemaOperationException { |
| GridCacheContext currCache = this.curCache.get(); |
| |
| if (currCache != null && F.eq(currCache.name(), cacheName)) |
| throw new IgniteSQLException("DROP TABLE cannot be called from the same cache that holds " + |
| "the table being dropped [cacheName-" + cacheName + ", tblName=" + tblName + ']', |
| IgniteQueryErrorCode.UNSUPPORTED_OPERATION); |
| |
| boolean res = ctx.grid().destroyCache0(cacheName, true); |
| |
| if (!res && !ifExists) |
| throw new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, tblName); |
| } |
| |
| /** |
| * Register cache in indexing SPI. |
| * |
| * @param cacheName Cache name. |
| * @param schemaName Schema name. |
| * @param cacheInfo Cache context info. |
| * @param cands Candidates. |
| * @param isSql {@code true} in case create cache initialized from SQL. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void registerCache0( |
| String cacheName, |
| String schemaName, |
| GridCacheContextInfo<?, ?> cacheInfo, |
| Collection<QueryTypeCandidate> cands, |
| boolean isSql |
| ) throws IgniteCheckedException { |
| synchronized (stateMux) { |
| if (moduleEnabled()) { |
| ctx.indexProcessor().idxRowCacheRegistry().onCacheRegistered(cacheInfo); |
| |
| schemaMgr.onCacheCreated(cacheName, schemaName, cacheInfo.config().getSqlFunctionClasses()); |
| |
| if (idx != null) |
| idx.registerCache(cacheName, schemaName, cacheInfo); |
| } |
| |
| try { |
| for (QueryTypeCandidate cand : cands) { |
| QueryTypeIdKey typeId = cand.typeId(); |
| QueryTypeIdKey altTypeId = cand.alternativeTypeId(); |
| QueryTypeDescriptorImpl desc = cand.descriptor(); |
| |
| if (typesByName.putIfAbsent(new QueryTypeNameKey(cacheName, desc.name()), desc) != null) |
| throw new IgniteCheckedException("Type with name '" + desc.name() + "' already indexed " + |
| "in cache '" + cacheName + "'."); |
| |
| types.put(typeId, desc); |
| |
| if (altTypeId != null) |
| types.put(altTypeId, desc); |
| |
| for (QueryIndexDescriptorImpl idx : desc.indexes0()) { |
| QueryIndexKey idxKey = new QueryIndexKey(schemaName, idx.name()); |
| |
| QueryIndexDescriptorImpl oldIdx = idxs.putIfAbsent(idxKey, idx); |
| |
| if (oldIdx != null) { |
| throw new IgniteException("Duplicate index name [cache=" + cacheName + |
| ", schemaName=" + schemaName + ", idxName=" + idx.name() + |
| ", existingTable=" + oldIdx.typeDescriptor().tableName() + |
| ", table=" + desc.tableName() + ']'); |
| } |
| } |
| |
| if (moduleEnabled()) |
| schemaMgr.onCacheTypeCreated(cacheInfo, desc, isSql); |
| } |
| |
| cacheNames.add(CU.mask(cacheName)); |
| } |
| catch (IgniteCheckedException | RuntimeException e) { |
| onCacheStop0(cacheInfo, true, true); |
| |
| throw e; |
| } |
| } |
| } |
| |
| /** |
| * Unregister cache.<p> |
| * Use with {@link #busyLock} where appropriate. |
| * |
| * @param cacheInfo Cache context info. |
| * @param destroy Destroy flag. |
| * @param clearIdx Clear flag. |
| */ |
| public void onCacheStop0(GridCacheContextInfo cacheInfo, boolean destroy, boolean clearIdx) { |
| if (!moduleEnabled() || !cacheNames.contains(cacheInfo.name())) |
| return; |
| |
| String cacheName = cacheInfo.name(); |
| |
| synchronized (stateMux) { |
| // Clear types. |
| Iterator<Map.Entry<QueryTypeIdKey, QueryTypeDescriptorImpl>> it = types.entrySet().iterator(); |
| |
| while (it.hasNext()) { |
| Map.Entry<QueryTypeIdKey, QueryTypeDescriptorImpl> entry = it.next(); |
| |
| if (F.eq(cacheName, entry.getKey().cacheName())) { |
| it.remove(); |
| |
| typesByName.remove(new QueryTypeNameKey(cacheName, entry.getValue().name())); |
| |
| entry.getValue().markObsolete(); |
| } |
| } |
| |
| // Clear indexes. |
| Iterator<Map.Entry<QueryIndexKey, QueryIndexDescriptorImpl>> idxIt = idxs.entrySet().iterator(); |
| |
| while (idxIt.hasNext()) { |
| Map.Entry<QueryIndexKey, QueryIndexDescriptorImpl> idxEntry = idxIt.next(); |
| |
| if (F.eq(cacheName, idxEntry.getValue().typeDescriptor().cacheName())) |
| idxIt.remove(); |
| } |
| |
| // Notify in-progress index operations. |
| for (SchemaOperation op : schemaOps.values()) { |
| if (op.started()) |
| op.manager().worker().cancel(); |
| } |
| |
| try { |
| ctx.indexProcessor().unregisterCache(cacheInfo); |
| |
| schemaMgr.onCacheDestroyed(cacheName, destroy, clearIdx); |
| |
| // Notify indexing. |
| if (idx != null) |
| idx.unregisterCache(cacheInfo); |
| } |
| catch (Exception e) { |
| U.error(log, "Failed to clear schema manager on cache unregister (will ignore): " + cacheName, e); |
| } |
| |
| cacheNames.remove(cacheName); |
| |
| Iterator<Long> missedCacheTypeIter = missedCacheTypes.iterator(); |
| |
| while (missedCacheTypeIter.hasNext()) { |
| long key = missedCacheTypeIter.next(); |
| |
| if (missedCacheTypeKeyMatches(key, cacheName)) |
| missedCacheTypeIter.remove(); |
| } |
| } |
| } |
| |
| /** |
| * Check is cache configured for SQL. |
| * |
| * @param cfg Cache configuration. |
| * @return {@code true} If cache configuration support SQL, {@code false} otherwise. |
| */ |
| private boolean cacheSupportSql(CacheConfiguration cfg) { |
| return !F.isEmpty(cfg.getQueryEntities()) |
| || !F.isEmpty(cfg.getSqlSchema()) |
| || !F.isEmpty(cfg.getSqlFunctionClasses()); |
| } |
| |
| /** |
| * Check whether provided key and value belongs to expected cache and table. |
| * |
| * @param cctx Target cache context. |
| * @param expCacheName Expected cache name. |
| * @param expTblName Expected table name. |
| * @param key Key. |
| * @param val Value. |
| * @return {@code True} if this key-value pair belongs to expected cache/table, {@code false} otherwise or |
| * if cache or table doesn't exist. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings("ConstantConditions") |
| public boolean belongsToTable(GridCacheContext cctx, String expCacheName, String expTblName, KeyCacheObject key, |
| CacheObject val) throws IgniteCheckedException { |
| QueryTypeDescriptorImpl desc = type(expCacheName, val); |
| |
| if (desc == null) |
| return false; |
| |
| if (!F.eq(expTblName, desc.tableName())) |
| return false; |
| |
| if (!cctx.cacheObjects().isBinaryObject(val)) { |
| Class<?> valCls = val.value(cctx.cacheObjectContext(), false).getClass(); |
| |
| if (!desc.valueClass().isAssignableFrom(valCls)) |
| return false; |
| } |
| |
| if (!cctx.cacheObjects().isBinaryObject(key)) { |
| Class<?> keyCls = key.value(cctx.cacheObjectContext(), false).getClass(); |
| |
| if (!desc.keyClass().isAssignableFrom(keyCls)) |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Get table name by specified cache and cache value class. |
| * |
| * @param cacheName Cache name. |
| * @param valType Value type. |
| * @return Table name or {@code null} if there is no match. |
| */ |
| public @Nullable String tableName(String cacheName, String valType) { |
| int typeId = ctx.cacheObjects().typeId(valType); |
| |
| QueryTypeIdKey id = new QueryTypeIdKey(cacheName, typeId); |
| |
| QueryTypeDescriptorImpl desc = types.get(id); |
| |
| return desc == null ? null : desc.tableName(); |
| } |
| |
| /** |
| * Mark that for given cache index should/would be rebuilt. |
| * |
| * @param cctx Cache context. |
| */ |
| public void markAsRebuildNeeded(GridCacheContext cctx, boolean val) { |
| if (rebuildIsMeaningless(cctx)) |
| return; |
| |
| idxProc.markRebuildIndexesForCache(cctx, val); |
| |
| schemaMgr.markIndexRebuild(cctx.name(), val); |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @return True if index rebuild is meaningless (index module is disabled, local node is not an affinity |
| * node fro this cache or queries are not enabled for the cache). |
| */ |
| @SuppressWarnings("RedundantIfStatement") |
| private boolean rebuildIsMeaningless(GridCacheContext cctx) { |
| // Query module is disabled, nothing to rebuild. |
| if (!moduleEnabled()) |
| return true; |
| |
| // No data on non-affinity nodes. |
| if (!cctx.affinityNode()) |
| return true; |
| |
| // No indexes to rebuild when there are no QueryEntities. |
| if (!cctx.isQueryEnabled()) |
| return true; |
| |
| return false; |
| } |
| |
| /** |
| * Rebuilds indexes for provided caches from corresponding hash indexes. |
| * |
| * @param cctx Cache context. |
| * @param force Force rebuild indexes. |
| * @return Future that will be completed when rebuilding is finished. |
| */ |
| public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx, boolean force) { |
| assert nonNull(cctx); |
| |
| // Indexing module is disabled, nothing to rebuild. |
| if (rebuildIsMeaningless(cctx)) |
| return chainIndexRebuildFuture(null, cctx); |
| |
| // No need to rebuild if cache has no data. |
| boolean empty = true; |
| |
| for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) { |
| if (!store.isEmpty()) { |
| empty = false; |
| |
| break; |
| } |
| } |
| |
| if (empty) |
| return chainIndexRebuildFuture(null, cctx); |
| |
| if (!busyLock.enterBusy()) { |
| return new GridFinishedFuture<>(new NodeStoppingException("Failed to rebuild indexes from hash " + |
| "(grid is stopping).")); |
| } |
| |
| try { |
| return rebuildIndexesFromHash0(cctx, force, new IndexRebuildCancelToken()); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Rebuild indexes for cache. |
| * |
| * @param cctx Cache context. |
| * @param force Force rebuild indexes. |
| */ |
| private IgniteInternalFuture<?> rebuildIndexesFromHash0( |
| GridCacheContext<?, ?> cctx, |
| boolean force, |
| IndexRebuildCancelToken cancelTok |
| ) { |
| IgniteInternalFuture<?> idxFut = idxProc.rebuildIndexesForCache(cctx, force, cancelTok); |
| |
| return chainIndexRebuildFuture(idxFut, cctx); |
| } |
| |
| /** |
| * Chain real index rebuild future with user future and do some post processing. |
| * |
| * @param idxFut Real index future. If {@code null} simply completes existing user future. |
| * @param cctx Cache context. |
| * @return Chained user future. |
| */ |
| private @Nullable IgniteInternalFuture<?> chainIndexRebuildFuture( |
| @Nullable IgniteInternalFuture<?> idxFut, |
| GridCacheContext<?, ?> cctx |
| ) { |
| GridFutureAdapter<Void> res = idxRebuildFutStorage.indexRebuildFuture(cctx.cacheId()); |
| |
| assert res != null; |
| |
| if (idxFut != null) { |
| String cacheInfo = "[name=" + cctx.name() + ", grpName=" + cctx.group().name() + "]"; |
| |
| if (log.isInfoEnabled()) |
| log.info("Started indexes rebuilding for cache " + cacheInfo); |
| |
| idxFut.listen(() -> { |
| Throwable err = idxFut.error(); |
| |
| if (isNull(err)) { |
| if (log.isInfoEnabled()) |
| log.info("Finished indexes rebuilding for cache " + cacheInfo); |
| } |
| else if (!(err instanceof NodeStoppingException)) |
| log.error("Failed to rebuild indexes for cache " + cacheInfo, err); |
| |
| idxRebuildFutStorage.onFinishRebuildIndexes(cctx.cacheId(), err); |
| }); |
| |
| return res; |
| } |
| else { |
| idxRebuildFutStorage.onFinishRebuildIndexes(cctx.cacheId(), null); |
| |
| return null; |
| } |
| } |
| |
| /** |
| * @return Future that will be completed when indexes for given cache are restored. |
| */ |
| @Nullable public IgniteInternalFuture<?> indexRebuildFuture(int cacheId) { |
| return idxRebuildFutStorage.indexRebuildFuture(cacheId); |
| } |
| |
| /** |
| * Getting the cache object context. |
| * |
| * @param cacheName Cache name. |
| * @return Cache object context, {@code null} if the cache was not found. |
| */ |
| @Nullable private CacheObjectContext cacheObjectContext(String cacheName) { |
| GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); |
| |
| return cache == null ? null : cache.context().cacheObjectContext(); |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param newRow New row. |
| * @param prevRow Previous row. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| public void store(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow prevRow, |
| boolean prevRowAvailable) |
| throws IgniteCheckedException { |
| assert cctx != null; |
| assert newRow != null; |
| assert prevRowAvailable || prevRow == null; |
| // No need to acquire busy lock here - operation is protected by GridCacheQueryManager.busyLock |
| |
| KeyCacheObject key = newRow.key(); |
| CacheObject val = newRow.value(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Store [cache=" + cctx.name() + ", key=" + key + ", val=" + val + "]"); |
| |
| String cacheName = cctx.name(); |
| |
| CacheObjectContext coctx = cctx.cacheObjectContext(); |
| |
| QueryTypeDescriptorImpl desc = typeByValue(cacheName, coctx, key, val, true); |
| |
| if (prevRowAvailable && prevRow != null) { |
| QueryTypeDescriptorImpl prevValDesc = typeByValue(cacheName, |
| coctx, |
| key, |
| prevRow.value(), |
| false); |
| |
| if (prevValDesc != desc) { |
| if (prevValDesc != null) { |
| idxProc.remove(cacheName, prevRow); |
| |
| if (idx != null) |
| idx.remove(cctx, prevValDesc, prevRow); |
| } |
| |
| // Row has already been removed from another table indexes |
| prevRow = null; |
| } |
| } |
| |
| if (desc == null) { |
| int typeId = ctx.cacheObjects().typeId(val); |
| |
| long missedCacheTypeKey = missedCacheTypeKey(cacheName, typeId); |
| |
| if (!missedCacheTypes.contains(missedCacheTypeKey)) { |
| if (missedCacheTypes.add(missedCacheTypeKey)) { |
| LT.warn(log, "Key-value pair is not inserted into any SQL table [cacheName=" + cacheName + |
| ", " + describeTypeMismatch(cacheName, val) + "]"); |
| |
| LT.warn( |
| log, |
| " ^-- Value type(s) are specified via CacheConfiguration.indexedTypes or CacheConfiguration.queryEntities" |
| ); |
| LT.warn(log, " ^-- Make sure that same type(s) used when adding Object or BinaryObject to cache"); |
| LT.warn(log, " ^-- Otherwise, entries will be stored in cache, but not appear as SQL Table rows"); |
| } |
| } |
| |
| return; |
| } |
| |
| idxProc.store(cctx, newRow, prevRow, prevRowAvailable); |
| |
| if (idx != null) |
| idx.store(cctx, desc, newRow, prevRow, prevRowAvailable); |
| } |
| |
| /** |
| * Pretty-prints difference between expected and actual value types. |
| * |
| * @param cacheName Cache name. |
| * @param val Value object. |
| * @return Human readable type difference. |
| */ |
| private String describeTypeMismatch(String cacheName, Object val) { |
| try { |
| QueryTypeDescriptorImpl indexedType = null; |
| |
| for (QueryTypeIdKey typeKey : types.keySet()) { |
| if (typeKey.cacheName().equals(cacheName)) { |
| if (indexedType != null) { |
| // More than one type for table - simplified message. |
| indexedType = null; |
| break; |
| } |
| |
| indexedType = types.get(typeKey); |
| } |
| } |
| |
| boolean bin = ctx.cacheObjects().isBinaryObject(val); |
| |
| if (indexedType != null && bin && |
| !indexedType.valueTypeName().equals(((BinaryObject)val).type().typeName())) { |
| |
| return "expValType=" + indexedType.valueTypeName() |
| + ", actualValType=" + ((BinaryObject)val).type().typeName(); |
| } |
| else if (bin) |
| return "valType=" + ((BinaryObject)val).type().typeName(); |
| else |
| return "val=" + val.toString(); |
| } |
| catch (Exception e) { |
| return val.getClass().getName(); |
| } |
| } |
| |
| /** |
| * @param cacheName Cache name. |
| * @param coctx Cache context. |
| * @param key Key. |
| * @param val Value. |
| * @param checkType If {@code true} checks that key and value type correspond to found TypeDescriptor. |
| * @return Type descriptor if found and {@code null} otherwise. |
| * @throws IgniteCheckedException If type check failed. |
| */ |
| @SuppressWarnings("ConstantConditions") |
| @Nullable public QueryTypeDescriptorImpl typeByValue( |
| String cacheName, |
| CacheObjectContext coctx, |
| KeyCacheObject key, |
| CacheObject val, |
| boolean checkType |
| ) throws IgniteCheckedException { |
| Class<?> valCls = null; |
| |
| QueryTypeIdKey id; |
| |
| boolean binaryVal = ctx.cacheObjects().isBinaryObject(val); |
| |
| if (binaryVal) { |
| int typeId = ctx.cacheObjects().typeId(val); |
| |
| id = new QueryTypeIdKey(cacheName, typeId); |
| } |
| else { |
| valCls = val.value(coctx, false).getClass(); |
| |
| id = new QueryTypeIdKey(cacheName, valCls); |
| } |
| |
| QueryTypeDescriptorImpl desc = types.get(id); |
| |
| if (desc == null) |
| return null; |
| |
| if (checkType) { |
| if (!binaryVal && !desc.valueClass().isAssignableFrom(valCls)) |
| throw new IgniteCheckedException("Failed to update index due to class name conflict" + |
| "(multiple classes with same simple name are stored in the same cache) " + |
| "[expCls=" + desc.valueClass().getName() + ", actualCls=" + valCls.getName() + ']'); |
| |
| if (!ctx.cacheObjects().isBinaryObject(key)) { |
| Class<?> keyCls = key.value(coctx, false).getClass(); |
| |
| if (!desc.keyClass().isAssignableFrom(keyCls)) |
| throw new IgniteCheckedException("Failed to update index, incorrect key class [expCls=" + |
| desc.keyClass().getName() + ", actualCls=" + keyCls.getName() + "]"); |
| } |
| } |
| |
| return desc; |
| } |
| |
| /** |
| * Gets type descriptor for cache by given object's type. |
| * |
| * @param cacheName Cache name. |
| * @param val Object to determine type for. |
| * @return Type descriptor. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings("ConstantConditions") |
| private QueryTypeDescriptorImpl type(String cacheName, CacheObject val) throws IgniteCheckedException { |
| QueryTypeIdKey id; |
| |
| boolean binaryVal = ctx.cacheObjects().isBinaryObject(val); |
| |
| if (binaryVal) |
| id = new QueryTypeIdKey(cacheName, ctx.cacheObjects().typeId(val)); |
| else { |
| CacheObjectContext coctx = cacheObjectContext(cacheName); |
| |
| if (coctx == null) |
| throw new IgniteCheckedException("Object context for cache not found: " + cacheName); |
| |
| id = new QueryTypeIdKey(cacheName, val.value(coctx, false).getClass()); |
| } |
| |
| return types.get(id); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void checkIndexingEnabled() throws IgniteCheckedException { |
| if (idx == null) |
| throw new IgniteCheckedException("Indexing is disabled."); |
| } |
| |
| /** |
| * @throws IgniteException If indexing is disabled. |
| */ |
| private void checkxIndexingEnabled() throws IgniteException { |
| if (idx == null) |
| throw new IgniteException("Failed to execute query because indexing is disabled (consider adding module " + |
| INDEXING.module() + " to classpath or moving it from 'optional' to 'libs' folder)."); |
| } |
| |
| /** |
| * @throws IgniteException If indexing is disabled. |
| */ |
| private void checkxModuleEnabled() throws IgniteException { |
| if (!moduleEnabled()) { |
| throw new IgniteException("Failed to execute query because indexing is disabled and no query engine is " + |
| "configured (consider adding module " + INDEXING.module() + " to classpath or moving it " + |
| "from 'optional' to 'libs' folder or configuring any query engine with " + |
| "IgniteConfiguration.SqlConfiguration.QueryEnginesConfiguration property)."); |
| } |
| } |
| |
| /** |
| * Query SQL fields. |
| * |
| * @param qry Query. |
| * @param keepBinary Keep binary flag. |
| * @return Cursor. |
| */ |
| public List<FieldsQueryCursor<List<?>>> querySqlFields(final SqlFieldsQuery qry, final boolean keepBinary, |
| final boolean failOnMultipleStmts) { |
| return querySqlFields( |
| null, |
| qry, |
| null, |
| keepBinary, |
| failOnMultipleStmts |
| ); |
| } |
| |
| /** |
| * Query SQL fields. |
| * |
| * @param qry Query. |
| * @param keepBinary Keep binary flag. |
| * @return Cursor. |
| */ |
| public FieldsQueryCursor<List<?>> querySqlFields(final SqlFieldsQuery qry, final boolean keepBinary) { |
| return querySqlFields( |
| null, |
| qry, |
| null, |
| keepBinary, |
| true |
| ).get(0); |
| } |
| |
| /** |
| * Query SQL fields. |
| * |
| * @param cctx Cache context. |
| * @param qry Query. |
| * @param cliCtx Client context. |
| * @param keepBinary Keep binary flag. |
| * @param failOnMultipleStmts If {@code true} the method must throws exception when query contains |
| * more then one SQL statement. |
| * @return Cursor. |
| */ |
| public List<FieldsQueryCursor<List<?>>> querySqlFields( |
| @Nullable final GridCacheContext<?, ?> cctx, |
| final SqlFieldsQuery qry, |
| final SqlClientContext cliCtx, |
| final boolean keepBinary, |
| final boolean failOnMultipleStmts |
| ) { |
| return querySqlFields( |
| cctx, |
| qry, |
| cliCtx, |
| keepBinary, |
| failOnMultipleStmts, |
| GridCacheQueryType.SQL_FIELDS, |
| null |
| ); |
| } |
| |
| /** |
| * Query SQL fields. |
| * |
| * @param cctx Cache context. |
| * @param qry Query. |
| * @param cliCtx Client context. |
| * @param keepBinary Keep binary flag. |
| * @param failOnMultipleStmts If {@code true} the method must throws exception when query contains |
| * more then one SQL statement. |
| * @param cancel Hook for query cancellation. |
| * @return Cursor. |
| */ |
| public List<FieldsQueryCursor<List<?>>> querySqlFields( |
| @Nullable final GridCacheContext<?, ?> cctx, |
| final SqlFieldsQuery qry, |
| final SqlClientContext cliCtx, |
| final boolean keepBinary, |
| final boolean failOnMultipleStmts, |
| @Nullable final GridQueryCancel cancel |
| ) { |
| return querySqlFields( |
| cctx, |
| qry, |
| cliCtx, |
| keepBinary, |
| failOnMultipleStmts, |
| GridCacheQueryType.SQL_FIELDS, |
| cancel |
| ); |
| } |
| |
| /** |
| * Query SQL fields. |
| * |
| * @param cctx Cache context. |
| * @param qry Query. |
| * @param cliCtx Client context. |
| * @param keepBinary Keep binary flag. |
| * @param failOnMultipleStmts If {@code true} the method must throws exception when query contains |
| * more then one SQL statement. |
| * @param qryType Real query type. |
| * @param cancel Hook for query cancellation. |
| * @return Cursor. |
| */ |
| public List<FieldsQueryCursor<List<?>>> querySqlFields( |
| @Nullable final GridCacheContext<?, ?> cctx, |
| final SqlFieldsQuery qry, |
| final SqlClientContext cliCtx, |
| final boolean keepBinary, |
| final boolean failOnMultipleStmts, |
| GridCacheQueryType qryType, |
| @Nullable final GridQueryCancel cancel |
| ) { |
| checkxModuleEnabled(); |
| |
| if (qry.isDistributedJoins() && qry.getPartitions() != null) |
| throw new CacheException("Using both partitions and distributed JOINs is not supported for the same query"); |
| |
| if (qry.isLocal() && ctx.clientNode()) |
| throw new CacheException("Execution of local SqlFieldsQuery on client node disallowed."); |
| |
| return executeQuerySafe(cctx, () -> { |
| final String schemaName = qry.getSchema() == null ? schemaName(cctx) : qry.getSchema(); |
| |
| IgniteOutClosureX<List<FieldsQueryCursor<List<?>>>> clo = |
| new IgniteOutClosureX<List<FieldsQueryCursor<List<?>>>>() { |
| @Override public List<FieldsQueryCursor<List<?>>> applyx() { |
| GridQueryCancel cancel0 = cancel != null ? cancel : new GridQueryCancel(); |
| |
| List<FieldsQueryCursor<List<?>>> res; |
| |
| QueryEngine qryEngine = engineForQuery(cliCtx, qry); |
| |
| if (qryEngine != null) { |
| QueryProperties qryProps = new QueryProperties(cctx == null ? null : cctx.name(), keepBinary); |
| |
| if (qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isBatched()) { |
| res = qryEngine.queryBatched( |
| QueryContext.of(qry, cliCtx, cancel, qryProps), |
| schemaName, |
| qry.getSql(), |
| ((SqlFieldsQueryEx)qry).batchedArguments() |
| ); |
| } |
| else { |
| res = qryEngine.query( |
| QueryContext.of(qry, cliCtx, cancel, qryProps), |
| schemaName, |
| qry.getSql(), |
| qry.getArgs() != null ? qry.getArgs() : X.EMPTY_OBJECT_ARRAY |
| ); |
| } |
| } |
| else { |
| res = idx.querySqlFields( |
| schemaName, |
| qry, |
| cliCtx, |
| keepBinary, |
| failOnMultipleStmts, |
| cancel0 |
| ); |
| } |
| |
| if (cctx != null) |
| sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx, qryType); |
| |
| return res; |
| } |
| }; |
| |
| return executeQuery(qryType, qry.getSql(), cctx, clo, true); |
| }); |
| } |
| |
| /** */ |
| public List<JdbcParameterMeta> parameterMetaData( |
| final SqlFieldsQuery qry, |
| @Nullable final SqlClientContext cliCtx |
| ) { |
| checkxModuleEnabled(); |
| |
| return executeQuerySafe(null, () -> { |
| final String schemaName = qry.getSchema() == null ? QueryUtils.DFLT_SCHEMA : qry.getSchema(); |
| |
| QueryEngine qryEngine = engineForQuery(cliCtx, qry); |
| |
| if (qryEngine != null) { |
| List<List<GridQueryFieldMetadata>> meta = qryEngine.parameterMetaData( |
| QueryContext.of(qry, cliCtx), |
| schemaName, |
| qry.getSql()); |
| |
| return meta.stream() |
| .flatMap(m -> m.stream().map(JdbcParameterMeta::new)) |
| .collect(Collectors.toList()); |
| } |
| else |
| return idx.parameterMetaData(schemaName, qry); |
| |
| }); |
| } |
| |
| /** */ |
| public List<GridQueryFieldMetadata> resultSetMetaData( |
| final SqlFieldsQuery qry, |
| @Nullable final SqlClientContext cliCtx |
| ) { |
| checkxModuleEnabled(); |
| |
| return executeQuerySafe(null, () -> { |
| final String schemaName = qry.getSchema() == null ? QueryUtils.DFLT_SCHEMA : qry.getSchema(); |
| |
| QueryEngine qryEngine = engineForQuery(cliCtx, qry); |
| |
| if (qryEngine != null) { |
| List<List<GridQueryFieldMetadata>> meta = qryEngine.resultSetMetaData( |
| QueryContext.of(qry, cliCtx), |
| schemaName, |
| qry.getSql()); |
| |
| if (meta.size() == 1) |
| return meta.get(0); |
| |
| return null; |
| } |
| else |
| return idx.resultMetaData(schemaName, qry); |
| }); |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @return Schema name. |
| */ |
| public String schemaName(GridCacheContext<?, ?> cctx) { |
| if (cctx != null) { |
| String cacheSchemaName = schemaMgr.schemaName(cctx.name()); |
| |
| if (!F.isEmpty(cacheSchemaName)) |
| return cacheSchemaName; |
| } |
| |
| return QueryUtils.DFLT_SCHEMA; |
| } |
| |
| /** |
| * Finds query engine to execute given query. |
| */ |
| private QueryEngine engineForQuery(SqlClientContext cliCtx, SqlFieldsQuery qry) { |
| String engineName = null; |
| |
| // Check hint. |
| Matcher hintMatcher = QRY_HINT_PATTERN.matcher(qry.getSql()); |
| |
| if (hintMatcher.find()) { |
| String hint = hintMatcher.group(1); |
| |
| Matcher engineMatcher = QRY_ENGINE_PATTERN.matcher(hint); |
| |
| if (engineMatcher.find()) |
| engineName = engineMatcher.group(1); |
| } |
| |
| // Check engine in client context. |
| if (engineName == null && cliCtx != null) |
| engineName = cliCtx.queryEngine(); |
| |
| if (engineName == null) |
| return dfltQryEngine; |
| |
| if (qryEnginesCfg == null) // Query engines not configured. |
| throw new IgniteException("Query engines not configured, but specified engine: " + engineName); |
| |
| // There are one or two query engines in array, it's faster to iterate in a loop than use hash map. |
| for (int i = 0; i < qryEnginesCfg.length; i++) { |
| if (engineName.equalsIgnoreCase(qryEnginesCfg[i].engineName())) |
| return qryEngines[i]; |
| } |
| |
| throw new IgniteException("Query engine not found: " + engineName); |
| } |
| |
| /** |
| * Execute query setting busy lock, preserving current cache context and properly handling checked exceptions. |
| * |
| * @param cctx Cache context. |
| * @param supplier Code to be executed. |
| * @return Result. |
| */ |
| private <T> T executeQuerySafe(@Nullable final GridCacheContext<?, ?> cctx, GridPlainOutClosure<T> supplier) { |
| GridCacheContext oldCctx = curCache.get(); |
| |
| curCache.set(cctx); |
| |
| if (!busyLock.enterBusy()) |
| throw new IllegalStateException("Failed to execute query (grid is stopping)."); |
| |
| try { |
| return supplier.apply(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new CacheException(e); |
| } |
| finally { |
| curCache.set(oldCctx); |
| |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @param cacheName Cache name. |
| * @param schemaName Schema name. |
| * @param streamer Data streamer. |
| * @param qry Query. |
| * @return Update counter. |
| */ |
| public long streamUpdateQuery(@Nullable final String cacheName, final String schemaName, |
| final IgniteDataStreamer<?, ?> streamer, final String qry, final Object[] args, |
| String qryInitiatorId) { |
| assert streamer != null; |
| |
| if (!busyLock.enterBusy()) |
| throw new IllegalStateException("Failed to execute query (grid is stopping)."); |
| |
| try { |
| GridCacheContext cctx = ctx.cache().cache(cacheName).context(); |
| |
| return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, cctx, new IgniteOutClosureX<Long>() { |
| @Override public Long applyx() throws IgniteCheckedException { |
| return idx.streamUpdateQuery(schemaName, qry, args, streamer, qryInitiatorId); |
| } |
| }, true); |
| } |
| catch (IgniteCheckedException e) { |
| throw new CacheException(e); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @param schemaName Schema name. |
| * @param cliCtx Client context. |
| * @param qry Query. |
| * @param args Query arguments. |
| * @return Update counters. |
| */ |
| public List<Long> streamBatchedUpdateQuery(final String schemaName, final SqlClientContext cliCtx, |
| final String qry, final List<Object[]> args, String qryInitiatorId) { |
| checkxIndexingEnabled(); |
| |
| if (!busyLock.enterBusy()) |
| throw new IllegalStateException("Failed to execute query (grid is stopping)."); |
| |
| try { |
| return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, null, new IgniteOutClosureX<List<Long>>() { |
| @Override public List<Long> applyx() throws IgniteCheckedException { |
| return idx.streamBatchedUpdateQuery(schemaName, qry, args, cliCtx, qryInitiatorId); |
| } |
| }, true); |
| } |
| catch (IgniteCheckedException e) { |
| throw new CacheException(e); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Execute distributed SQL query. |
| * |
| * @param cctx Cache context. |
| * @param qry Query. |
| * @param keepBinary Keep binary flag. |
| * @return Cursor. |
| */ |
| public <K, V> QueryCursor<Cache.Entry<K, V>> querySql( |
| final GridCacheContext<?, ?> cctx, |
| final SqlQuery qry, |
| boolean keepBinary |
| ) { |
| // Generate. |
| String type = qry.getType(); |
| |
| String typeName = typeName(cctx.name(), type); |
| |
| qry.setType(typeName); |
| |
| SqlFieldsQuery fieldsQry = idx.generateFieldsQuery(cctx.name(), qry); |
| |
| // Execute. |
| FieldsQueryCursor<List<?>> res = querySqlFields( |
| cctx, |
| fieldsQry, |
| null, |
| keepBinary, |
| true, |
| GridCacheQueryType.SQL, |
| null |
| ).get(0); |
| |
| // Convert. |
| QueryKeyValueIterable<K, V> converted = new QueryKeyValueIterable<>(res); |
| |
| return new QueryCursorImpl<Cache.Entry<K, V>>(converted) { |
| @Override public void close() { |
| converted.cursor().close(); |
| } |
| }; |
| } |
| |
| /** |
| * Collect local queries that already running more than specified duration. |
| * |
| * @param duration Duration to check. |
| * @return Collection of long running queries. |
| */ |
| public Collection<GridRunningQueryInfo> runningQueries(long duration) { |
| return runningQryMgr.runningQueries(duration); |
| } |
| |
| /** |
| * Cancel query running on remote or local Node. |
| * |
| * @param queryId Query id. |
| * @param nodeId Node id, if {@code null}, cancel local query. |
| * @param async If {@code true}, execute asynchronously. |
| */ |
| public void cancelQuery(long queryId, @Nullable UUID nodeId, boolean async) { |
| runningQryMgr.cancelQuery(queryId, nodeId, async); |
| } |
| |
| /** |
| * Cancel specified queries. |
| * |
| * @param queries Queries ID's to cancel. |
| */ |
| public void cancelLocalQueries(Collection<Long> queries) { |
| if (!F.isEmpty(queries)) { |
| for (Long qryId : queries) |
| runningQryMgr.cancelLocalQuery(qryId); |
| } |
| } |
| |
| /** |
| * Entry point for index procedure. |
| * |
| * @param cacheName Cache name. |
| * @param schemaName Schema name. |
| * @param tblName Table name. |
| * @param idx Index. |
| * @param ifNotExists When set to {@code true} operation will fail if index already exists. |
| * @param parallel Index creation parallelism level. |
| * @return Future completed when index is created. |
| */ |
| public IgniteInternalFuture<?> dynamicIndexCreate(String cacheName, String schemaName, String tblName, |
| QueryIndex idx, boolean ifNotExists, int parallel) { |
| SchemaAbstractOperation op = new SchemaIndexCreateOperation(UUID.randomUUID(), cacheName, schemaName, tblName, |
| idx, ifNotExists, parallel); |
| |
| return startIndexOperationDistributed(op); |
| } |
| |
| /** |
| * Entry point for index drop procedure |
| * |
| * @param cacheName Cache name. |
| * @param schemaName Schema name. |
| * @param idxName Index name. |
| * @param ifExists When set to {@code true} operation fill fail if index doesn't exists. |
| * @return Future completed when index is created. |
| */ |
| public IgniteInternalFuture<?> dynamicIndexDrop(String cacheName, String schemaName, String idxName, |
| boolean ifExists) { |
| SchemaAbstractOperation op = new SchemaIndexDropOperation(UUID.randomUUID(), cacheName, schemaName, idxName, |
| ifExists); |
| |
| return startIndexOperationDistributed(op); |
| } |
| |
| /** |
| * Entry point for add column procedure. |
| * @param schemaName Schema name. |
| * @param tblName Target table name. |
| * @param cols Columns to add. |
| * @param ifTblExists Ignore operation if target table doesn't exist. |
| * @param ifNotExists Ignore operation if column exists. |
| */ |
| public IgniteInternalFuture<?> dynamicColumnAdd(String cacheName, String schemaName, String tblName, |
| List<QueryField> cols, boolean ifTblExists, boolean ifNotExists) { |
| |
| SchemaAlterTableAddColumnOperation op = new SchemaAlterTableAddColumnOperation(UUID.randomUUID(), cacheName, |
| schemaName, tblName, cols, ifTblExists, ifNotExists); |
| |
| return startIndexOperationDistributed(op); |
| } |
| |
| /** |
| * Entry point for drop column procedure. |
| * |
| * @param schemaName Schema name. |
| * @param tblName Target table name. |
| * @param cols Columns to drop. |
| * @param ifTblExists Ignore operation if target table doesn't exist. |
| * @param ifExists Ignore operation if column does not exist. |
| */ |
| public IgniteInternalFuture<?> dynamicColumnRemove(String cacheName, String schemaName, String tblName, |
| List<String> cols, boolean ifTblExists, boolean ifExists) { |
| |
| SchemaAlterTableDropColumnOperation op = new SchemaAlterTableDropColumnOperation(UUID.randomUUID(), cacheName, |
| schemaName, tblName, cols, ifTblExists, ifExists); |
| |
| return startIndexOperationDistributed(op); |
| } |
| |
| /** |
| * Enable dynamically indexing of existing cache. |
| * |
| * @param cacheName Cache name |
| * @param schemaName Target schema name. |
| * @param entity Instance of {@code QueryEntity}. |
| * @param qryParallelism Query parallelism. |
| * @param sqlEscape Escape flag, see{@link QueryUtils#normalizeQueryEntity}. |
| */ |
| public IgniteInternalFuture<?> dynamicAddQueryEntity( |
| String cacheName, |
| String schemaName, |
| QueryEntity entity, |
| Integer qryParallelism, |
| boolean sqlEscape |
| ) { |
| assert qryParallelism == null || qryParallelism > 0; |
| |
| CacheConfiguration cfg = ctx.cache().cacheConfiguration(cacheName); |
| |
| if (qryParallelism != null && qryParallelism > 1 && cfg.getCacheMode() != PARTITIONED) |
| throw new IgniteSQLException("Segmented indices are supported for PARTITIONED mode only."); |
| |
| QueryEntity entity0 = QueryUtils.normalizeQueryEntity(ctx, entity, sqlEscape); |
| |
| SchemaAddQueryEntityOperation op = new SchemaAddQueryEntityOperation( |
| UUID.randomUUID(), |
| cacheName, |
| schemaName, |
| Collections.singletonList(entity0), |
| qryParallelism != null ? qryParallelism : CacheConfiguration.DFLT_QUERY_PARALLELISM, |
| sqlEscape); |
| |
| return startIndexOperationDistributed(op); |
| } |
| |
| /** |
| * Start distributed index change operation. |
| * |
| * @param op Operation. |
| * @return Future. |
| */ |
| private IgniteInternalFuture<?> startIndexOperationDistributed(SchemaAbstractOperation op) { |
| SchemaOperationClientFuture fut = new SchemaOperationClientFuture(op.id()); |
| |
| SchemaOperationClientFuture oldFut = schemaCliFuts.put(op.id(), fut); |
| |
| assert oldFut == null; |
| |
| try { |
| ctx.discovery().sendCustomEvent(new SchemaProposeDiscoveryMessage(op)); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Sent schema propose discovery message [opId=" + op.id() + ", op=" + op + ']'); |
| |
| boolean disconnected0; |
| |
| synchronized (stateMux) { |
| disconnected0 = disconnected; |
| } |
| |
| if (disconnected0) { |
| fut.onDone(new SchemaOperationException("Client node is disconnected (operation result is unknown).")); |
| |
| schemaCliFuts.remove(op.id()); |
| } |
| } |
| catch (Exception e) { |
| if (e instanceof SchemaOperationException) |
| fut.onDone(e); |
| else { |
| fut.onDone(new SchemaOperationException("Failed to start schema change operation due to " + |
| "unexpected exception [opId=" + op.id() + ", op=" + op + ']', e)); |
| } |
| |
| schemaCliFuts.remove(op.id()); |
| } |
| |
| return fut; |
| } |
| |
| /** |
| * @param sqlQry Sql query. |
| * @param params Params of the query. |
| * @param cctx cache context. |
| * @param qryType actual query type, usually either SQL or SQL_FIELDS. |
| */ |
| private void sendQueryExecutedEvent( |
| String sqlQry, |
| Object[] params, |
| GridCacheContext<?, ?> cctx, |
| GridCacheQueryType qryType |
| ) { |
| if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { |
| ctx.event().record(new CacheQueryExecutedEvent<>( |
| ctx.discovery().localNode(), |
| qryType.name() + " query executed.", |
| EVT_CACHE_QUERY_EXECUTED, |
| qryType.name(), |
| cctx.name(), |
| null, |
| sqlQry, |
| null, |
| null, |
| params, |
| ctx.localNodeId(), |
| null)); |
| } |
| } |
| |
| /** |
| * Update type descriptor with new fields metadata. |
| * |
| * @param d Type descriptor to update. |
| * @param cols Columns to add. |
| * @throws IgniteCheckedException If failed to update type descriptor. |
| */ |
| private void processDynamicAddColumn(QueryTypeDescriptorImpl d, List<QueryField> cols) |
| throws IgniteCheckedException { |
| List<GridQueryProperty> props = new ArrayList<>(cols.size()); |
| |
| for (QueryField col : cols) { |
| try { |
| props.add(new QueryBinaryProperty( |
| ctx, |
| col.name(), |
| null, |
| Class.forName(col.typeName()), |
| false, |
| null, |
| !col.isNullable(), |
| null, |
| col.precision(), |
| col.scale())); |
| } |
| catch (ClassNotFoundException e) { |
| throw new SchemaOperationException("Class not found for new property: " + col.typeName()); |
| } |
| } |
| |
| for (GridQueryProperty p : props) |
| d.addProperty(p, true); |
| } |
| |
| /** |
| * Remove fields from type descriptor. |
| * |
| * @param d Type descriptor to update. |
| * @param cols Columns to remove. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void processDynamicDropColumn(QueryTypeDescriptorImpl d, List<String> cols) |
| throws IgniteCheckedException { |
| for (String field : cols) |
| d.removeProperty(field); |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param row Row removed from cache. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void remove(GridCacheContext cctx, CacheDataRow row) |
| throws IgniteCheckedException { |
| assert row != null; |
| // No need to acquire busy lock here - operation is protected by GridCacheQueryManager.busyLock |
| |
| if (log.isDebugEnabled()) |
| log.debug("Remove [cacheName=" + cctx.name() + ", key=" + row.key() + ", val=" + row.value() + "]"); |
| |
| QueryTypeDescriptorImpl desc = typeByValue(cctx.name(), |
| cctx.cacheObjectContext(), |
| row.key(), |
| row.value(), |
| false); |
| |
| if (desc == null) |
| return; |
| |
| idxProc.remove(cctx.name(), row); |
| |
| if (indexingEnabled()) |
| idx.remove(cctx, desc, row); |
| } |
| |
| /** |
| * @param cacheName Cache name. |
| * @param clause Clause. |
| * @param resType Result type. |
| * @param filters Key and value filters. |
| * @param <K> Key type. |
| * @param <V> Value type. |
| * @param limit Limits response records count. If 0 or less, the limit considered to be Integer.MAX_VALUE, that is virtually no limit. |
| * @return Key/value rows. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryText(final String cacheName, final String clause, |
| final String resType, final IndexingQueryFilter filters, int limit) throws IgniteCheckedException { |
| checkIndexingEnabled(); |
| |
| if (!busyLock.enterBusy()) |
| throw new IllegalStateException("Failed to execute query (grid is stopping)."); |
| |
| try { |
| final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(cacheName).context(); |
| |
| return executeQuery(GridCacheQueryType.TEXT, clause, cctx, |
| new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() { |
| @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException { |
| String typeName = typeName(cacheName, resType); |
| String schemaName = schemaMgr.schemaName(cacheName); |
| |
| return idx.queryLocalText(schemaName, cacheName, clause, typeName, filters, limit); |
| } |
| }, true); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @param <K> Key type. |
| * @param <V> Value type. |
| * @param cacheName Cache name. |
| * @param valCls Cache value class. |
| * @param idxQryDesc Index query description. |
| * @param entryFilter Optional user defined cache entries filter. |
| * @param cacheFilter Ignite specific cache entries filters. |
| * @param keepBinary Keep binary flag. |
| * @return Key/value rows. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public <K, V> IndexQueryResult<K, V> queryIndex( |
| String cacheName, |
| String valCls, |
| final IndexQueryDesc idxQryDesc, |
| @Nullable IgniteBiPredicate<K, V> entryFilter, |
| final IndexingQueryFilter cacheFilter, |
| boolean keepBinary |
| ) throws IgniteCheckedException { |
| if (!busyLock.enterBusy()) |
| throw new IllegalStateException("Failed to execute query (grid is stopping)."); |
| |
| try { |
| final GridCacheContext<K, V> cctx = (GridCacheContext<K, V>)ctx.cache().internalCache(cacheName).context(); |
| |
| return executeQuery(GridCacheQueryType.INDEX, valCls, cctx, |
| new IgniteOutClosureX<IndexQueryResult<K, V>>() { |
| @Override public IndexQueryResult<K, V> applyx() throws IgniteCheckedException { |
| try { |
| return idxQryPrc.queryLocal(cctx, idxQryDesc, entryFilter, cacheFilter, keepBinary); |
| } |
| catch (IgniteCheckedException e) { |
| String msg = "Failed to execute IndexQuery: " + e.getMessage() + ". Query desc: " + idxQryDesc; |
| |
| throw new IgniteCheckedException(msg, e); |
| } |
| } |
| }, true); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Gets types for cache. |
| * |
| * @param cacheName Cache name. |
| * @return Descriptors. |
| */ |
| public Collection<GridQueryTypeDescriptor> types(@Nullable String cacheName) { |
| Collection<GridQueryTypeDescriptor> cacheTypes = newSetFromMap(new IdentityHashMap<>()); |
| |
| for (Map.Entry<QueryTypeIdKey, QueryTypeDescriptorImpl> e : types.entrySet()) { |
| if (F.eq(e.getKey().cacheName(), cacheName)) |
| cacheTypes.add(e.getValue()); |
| } |
| |
| return cacheTypes; |
| } |
| |
| /** |
| * Get type descriptor for the given cache and table name. |
| * @param cacheName Cache name. |
| * @param tblName Table name. |
| * @return Type (if any). |
| */ |
| @Nullable private QueryTypeDescriptorImpl type(@Nullable String cacheName, String tblName) { |
| for (QueryTypeDescriptorImpl type : types.values()) { |
| if (F.eq(cacheName, type.cacheName()) && F.eq(tblName, type.tableName())) |
| return type; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Gets type name for provided cache name and type name if type is still valid. |
| * |
| * @param cacheName Cache name. |
| * @param typeName Type name. |
| * @return Type descriptor. |
| */ |
| private String typeName(@Nullable String cacheName, String typeName) throws IgniteException { |
| QueryTypeDescriptorImpl type = typesByName.get(new QueryTypeNameKey(cacheName, typeName)); |
| |
| if (type == null) |
| throw new IgniteException("Failed to find SQL table for type: " + typeName); |
| |
| return type.name(); |
| } |
| |
| /** |
| * Gets type descriptor for provided cache name and type name if type is still valid. |
| * |
| * @param cacheName Cache name. |
| * @param typeName Type name. |
| * @return Query type descriptor or {@code null} if descriptor was not found. |
| */ |
| public @Nullable GridQueryTypeDescriptor typeDescriptor(@Nullable String cacheName, String typeName) { |
| return typesByName.get(new QueryTypeNameKey(cacheName, typeName)); |
| } |
| |
| /** |
| * @param qryType Query type. |
| * @param qry Query description. |
| * @param cctx Cache context. |
| * @param clo Closure. |
| * @param complete Complete. |
| */ |
| public <R> R executeQuery(GridCacheQueryType qryType, String qry, @Nullable GridCacheContext<?, ?> cctx, |
| IgniteOutClosureX<R> clo, boolean complete) throws IgniteCheckedException { |
| final long startTime = U.currentTimeMillis(); |
| |
| Throwable err = null; |
| |
| R res = null; |
| |
| try { |
| res = clo.apply(); |
| |
| if (res instanceof CacheQueryFuture) { |
| CacheQueryFuture fut = (CacheQueryFuture)res; |
| |
| err = fut.error(); |
| } |
| |
| return res; |
| } |
| catch (GridClosureException e) { |
| err = e.unwrap(); |
| |
| throw (IgniteCheckedException)err; |
| } |
| catch (CacheException | IgniteException e) { |
| err = e; |
| |
| throw e; |
| } |
| catch (Exception e) { |
| err = e; |
| |
| throw new IgniteCheckedException(e); |
| } |
| finally { |
| boolean failed = err != null; |
| |
| long duration = U.currentTimeMillis() - startTime; |
| |
| if (complete || failed) { |
| if (cctx != null) |
| cctx.queries().collectMetrics(qryType, qry, startTime, duration, failed); |
| |
| if (log.isTraceEnabled()) |
| log.trace("Query execution [startTime=" + startTime + ", duration=" + duration + |
| ", fail=" + failed + ", res=" + res + ']'); |
| } |
| } |
| } |
| |
| /** |
| * Send status message to coordinator node. |
| * |
| * @param destNodeId Destination node ID. |
| * @param opId Operation ID. |
| * @param err Error. |
| * @param nop No-op flag. |
| */ |
| public void sendStatusMessage(UUID destNodeId, UUID opId, SchemaOperationException err, boolean nop) { |
| if (log.isDebugEnabled()) |
| log.debug("Sending schema operation status message [opId=" + opId + ", crdNode=" + destNodeId + |
| ", err=" + err + ", nop=" + nop + ']'); |
| |
| try { |
| byte[] errBytes = marshalSchemaError(opId, err); |
| |
| SchemaOperationStatusMessage msg = new SchemaOperationStatusMessage(opId, errBytes, nop); |
| |
| // Messages must go to dedicated schema pool. We cannot push them to query pool because in this case |
| // they could be blocked with other query requests. |
| ctx.io().sendToGridTopic(destNodeId, TOPIC_SCHEMA, msg, SCHEMA_POOL); |
| } |
| catch (IgniteCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send schema status response [opId=" + opId + ", destNodeId=" + destNodeId + |
| ", err=" + e + ", nop=" + nop + ']'); |
| } |
| } |
| |
| /** |
| * Process status message. |
| * |
| * @param msg Status message. |
| */ |
| private void processStatusMessage(SchemaOperationStatusMessage msg) { |
| synchronized (stateMux) { |
| if (completedOpIds.contains(msg.operationId())) { |
| // Received message from a node which joined topology in the middle of operation execution. |
| if (log.isDebugEnabled()) |
| log.debug("Received status message for completed operation (will ignore) [" + |
| "opId=" + msg.operationId() + ", sndNodeId=" + msg.senderNodeId() + ']'); |
| |
| return; |
| } |
| |
| UUID opId = msg.operationId(); |
| |
| SchemaProposeDiscoveryMessage proposeMsg = activeProposals.get(opId); |
| |
| if (proposeMsg != null) { |
| SchemaOperation op = schemaOps.get(proposeMsg.schemaName()); |
| |
| if (op != null && F.eq(op.id(), opId) && op.started() && coordinator().isLocal()) { |
| if (log.isDebugEnabled()) |
| log.debug("Received status message [opId=" + msg.operationId() + |
| ", sndNodeId=" + msg.senderNodeId() + ']'); |
| |
| op.manager().onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes()), msg.nop()); |
| |
| return; |
| } |
| } |
| |
| // Put to pending set if operation is not visible/ready yet. |
| pendingMsgs.add(msg); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Received status message (added to pending set) [opId=" + msg.operationId() + |
| ", sndNodeId=" + msg.senderNodeId() + ']'); |
| } |
| } |
| |
| /** |
| * Unwind pending messages for particular operation. |
| * |
| * @param opId Operation ID. |
| * @param mgr Manager. |
| */ |
| private void unwindPendingMessages(UUID opId, SchemaOperationManager mgr) { |
| assert Thread.holdsLock(stateMux); |
| |
| Iterator<SchemaOperationStatusMessage> it = pendingMsgs.iterator(); |
| |
| while (it.hasNext()) { |
| SchemaOperationStatusMessage msg = it.next(); |
| |
| if (F.eq(msg.operationId(), opId)) { |
| mgr.onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes()), msg.nop()); |
| |
| it.remove(); |
| } |
| } |
| } |
| |
| /** |
| * Marshal schema error. |
| * |
| * @param err Error. |
| * @return Error bytes. |
| */ |
| @Nullable private byte[] marshalSchemaError(UUID opId, @Nullable SchemaOperationException err) { |
| if (err == null) |
| return null; |
| |
| try { |
| return U.marshal(marsh, err); |
| } |
| catch (Exception e) { |
| U.warn(log, "Failed to marshal schema operation error [opId=" + opId + ", err=" + err + ']', e); |
| |
| try { |
| return U.marshal(marsh, new SchemaOperationException("Operation failed, but error cannot be " + |
| "serialized (see local node log for more details) [opId=" + opId + ", nodeId=" + |
| ctx.localNodeId() + ']')); |
| } |
| catch (Exception e0) { |
| assert false; // Impossible situation. |
| |
| return null; |
| } |
| } |
| } |
| |
| /** |
| * Unmarshal schema error. |
| * |
| * @param errBytes Error bytes. |
| * @return Error. |
| */ |
| @Nullable private SchemaOperationException unmarshalSchemaError(@Nullable byte[] errBytes) { |
| if (errBytes == null) |
| return null; |
| |
| try { |
| return U.unmarshal(marsh, errBytes, U.resolveClassLoader(ctx.config())); |
| } |
| catch (Exception e) { |
| return new SchemaOperationException("Operation failed, but error cannot be deserialized."); |
| } |
| } |
| |
| /** |
| * @return Value object context. |
| */ |
| public CacheQueryObjectValueContext objectContext() { |
| return valCtx; |
| } |
| |
| /** |
| * Performs validation of provided key and value against configured constraints. |
| * Throws runtime exception if validation fails. |
| * |
| * @param coctx Cache object context. |
| * @param key Key. |
| * @param val Value. |
| * @throws IgniteCheckedException, If error happens. |
| */ |
| public void validateKeyAndValue(CacheObjectContext coctx, KeyCacheObject key, CacheObject val) |
| throws IgniteCheckedException { |
| QueryTypeDescriptorImpl desc = typeByValue(coctx.cacheName(), coctx, key, val, true); |
| |
| if (desc == null) |
| return; |
| |
| desc.validateKeyAndValue(key, val); |
| } |
| |
| /** |
| * Performs necessary actions on disconnect of a stateful client (say, one associated with a transaction). |
| * |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void onClientDisconnect() throws IgniteCheckedException { |
| if (idx != null) |
| idx.onClientDisconnect(); |
| } |
| |
| /** |
| * @param ver Version. |
| */ |
| public static void setRequestAffinityTopologyVersion(AffinityTopologyVersion ver) { |
| requestTopVer.set(ver); |
| } |
| |
| /** |
| * @return Affinity topology version of the current request. |
| */ |
| public static AffinityTopologyVersion getRequestAffinityTopologyVersion() { |
| return requestTopVer.get(); |
| } |
| |
| /** |
| * Create missed cache type key. |
| * |
| * @param cacheName Cache name. |
| * @param typeId Type ID. |
| * @return Key. |
| */ |
| private static long missedCacheTypeKey(String cacheName, int typeId) { |
| return ((long)CU.cacheId(cacheName) << 32) | typeId; |
| } |
| |
| /** |
| * @param key Key. |
| * @param cacheName Cache name. |
| * @return {@code True} if matches. |
| */ |
| private static boolean missedCacheTypeKeyMatches(long key, String cacheName) { |
| int cacheId = CU.cacheId(cacheName); |
| |
| long cacheIdShifted = ((long)cacheId << 32); |
| |
| return (key & cacheIdShifted) == cacheIdShifted; |
| } |
| |
| /** |
| * Schema operation. |
| */ |
| private class SchemaOperation { |
| /** Original propose msg. */ |
| private final SchemaProposeDiscoveryMessage proposeMsg; |
| |
| /** Next schema operation. */ |
| private SchemaOperation next; |
| |
| /** Operation manager. */ |
| private SchemaOperationManager mgr; |
| |
| /** Finish message. */ |
| private SchemaFinishDiscoveryMessage finishMsg; |
| |
| /** Finish guard. */ |
| private final AtomicBoolean finishGuard = new AtomicBoolean(); |
| |
| /** |
| * Constructor. |
| * |
| * @param proposeMsg Original propose message. |
| */ |
| public SchemaOperation(SchemaProposeDiscoveryMessage proposeMsg) { |
| this.proposeMsg = proposeMsg; |
| } |
| |
| /** |
| * @return Operation ID. |
| */ |
| public UUID id() { |
| return proposeMsg.operation().id(); |
| } |
| |
| /** |
| * @return Original propose message. |
| */ |
| public SchemaProposeDiscoveryMessage proposeMessage() { |
| return proposeMsg; |
| } |
| |
| /** |
| * @return Next schema operation. |
| */ |
| @Nullable public SchemaOperation next() { |
| return next; |
| } |
| |
| /** |
| * @param next Next schema operation. |
| */ |
| public void next(SchemaOperation next) { |
| this.next = next; |
| } |
| |
| /** |
| * @param finishMsg Finish message. |
| */ |
| public void finishMessage(SchemaFinishDiscoveryMessage finishMsg) { |
| this.finishMsg = finishMsg; |
| } |
| |
| /** |
| * @return {@code True} if finish request already received. |
| */ |
| public boolean hasFinishMessage() { |
| return finishMsg != null; |
| } |
| |
| /** |
| * Handle finish message. |
| */ |
| @SuppressWarnings("unchecked") |
| public void doFinish() { |
| assert started(); |
| |
| if (!finishGuard.compareAndSet(false, true)) |
| return; |
| |
| final UUID opId = id(); |
| final String schemaName = proposeMsg.schemaName(); |
| |
| // Operation might be still in progress on client nodes which are not tracked by coordinator, |
| // so we chain to operation future instead of doing synchronous unwind. |
| mgr.worker().future().listen(new IgniteInClosure<IgniteInternalFuture>() { |
| @Override public void apply(IgniteInternalFuture fut) { |
| synchronized (stateMux) { |
| SchemaOperation op = schemaOps.remove(schemaName); |
| |
| assert op != null; |
| assert F.eq(op.id(), opId); |
| |
| // Complete client future (if any). |
| SchemaOperationClientFuture cliFut = schemaCliFuts.remove(opId); |
| |
| if (cliFut != null) { |
| if (finishMsg.hasError()) |
| cliFut.onDone(finishMsg.error()); |
| else |
| cliFut.onDone(); |
| } |
| |
| // Chain to the next operation (if any). |
| final SchemaOperation nextOp = op.next(); |
| |
| if (nextOp != null) { |
| schemaOps.put(schemaName, nextOp); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Next schema change operation started [opId=" + nextOp.id() + ']'); |
| |
| assert !nextOp.started(); |
| |
| // Cannot execute operation synchronously because it may cause starvation in exchange |
| // thread under load. Hence, moving short-lived operation to separate worker. |
| new IgniteThread(ctx.igniteInstanceName(), "schema-circuit-breaker-" + op.id(), |
| new Runnable() { |
| @Override public void run() { |
| onSchemaPropose(nextOp.proposeMessage()); |
| } |
| }).start(); |
| } |
| } |
| } |
| }); |
| } |
| |
| /** |
| * Unwind operation queue and get tail operation. |
| * |
| * @return Tail operation. |
| */ |
| public SchemaOperation unwind() { |
| if (next == null) |
| return this; |
| else |
| return next.unwind(); |
| } |
| |
| /** |
| * Whether operation started. |
| * |
| * @return {@code True} if started. |
| */ |
| public boolean started() { |
| return mgr != null; |
| } |
| |
| /** |
| * @return Operation manager. |
| */ |
| public SchemaOperationManager manager() { |
| return mgr; |
| } |
| |
| /** |
| * @param mgr Operation manager. |
| */ |
| public void manager(SchemaOperationManager mgr) { |
| assert this.mgr == null; |
| |
| this.mgr = mgr; |
| } |
| } |
| |
| /** |
| * Removing futures of rebuilding indexes that should have been rebuilt on the exchange. |
| * |
| * @param fut Exchange future. |
| * @param cacheIds Cache ids for which futures will be deleted, |
| * if {@code null} then ids will be taken from the {@code fut}. |
| */ |
| public void removeIndexRebuildFuturesOnExchange( |
| GridDhtPartitionsExchangeFuture fut, |
| @Nullable Set<Integer> cacheIds |
| ) { |
| idxRebuildFutStorage.cancelRebuildIndexesOnExchange( |
| cacheIds != null ? cacheIds : rebuildIndexCacheIds(fut), |
| fut.initialVersion() |
| ); |
| } |
| |
| /** |
| * Checks that the indexes need to be rebuilt on the exchange. |
| * |
| * @param cacheId Cache id. |
| * @param fut Exchange future. |
| * @return {@code True} if need to rebuild. |
| */ |
| public boolean rebuildIndexOnExchange(int cacheId, GridDhtPartitionsExchangeFuture fut) { |
| return idxRebuildFutStorage.rebuildIndexesOnExchange(cacheId, fut.initialVersion()); |
| } |
| |
| /** |
| * Preparing futures of rebuilding indexes for caches. |
| * The future for the cache will be added only if the previous one is missing or completed. |
| * |
| * @param cacheIds Cache ids. |
| * @return Cache ids for which features have not been added. |
| */ |
| public Set<Integer> prepareRebuildIndexes(Set<Integer> cacheIds) { |
| return idxRebuildFutStorage.prepareRebuildIndexes(cacheIds, null); |
| } |
| |
| /** |
| * Getting cache ids for which will need to rebuild the indexes on the exchange. |
| * |
| * @param fut Exchange future. |
| * @return Cache ids. |
| */ |
| private Set<Integer> rebuildIndexCacheIds(GridDhtPartitionsExchangeFuture fut) { |
| ExchangeActions acts = fut.exchangeActions(); |
| |
| Set<Integer> cacheIds = emptySet(); |
| |
| if (acts != null) { |
| if (!F.isEmpty(acts.cacheStartRequests())) { |
| cacheIds = acts.cacheStartRequests().stream() |
| .map(d -> CU.cacheId(d.request().cacheName())) |
| .collect(toSet()); |
| } |
| else if (acts.localJoinContext() != null && !F.isEmpty(acts.localJoinContext().caches())) { |
| cacheIds = acts.localJoinContext().caches().stream() |
| .map(t2 -> t2.get1().cacheId()) |
| .collect(toSet()); |
| } |
| } |
| |
| return cacheIds; |
| } |
| |
| /** |
| * Callback on start of rebuild cache indexes. |
| * <p/> |
| * Adding an entry that rebuilding the cache indexes in progress. |
| * If the cache is persistent, then add this entry to the MetaStorage. |
| * <p/> |
| * When restarting/reactivating the node, it will be possible to check if |
| * the rebuilding of the indexes has been {@link #rebuildIndexesCompleted}. |
| * |
| * @param cacheCtx Cache context. |
| * @param recreate {@code True} if index.bin recreating. |
| * @see #onFinishRebuildIndexes |
| * @see #rebuildIndexesCompleted |
| */ |
| public void onStartRebuildIndexes(GridCacheContext cacheCtx, boolean recreate) { |
| idxBuildStatusStorage.onStartRebuildIndexes(cacheCtx, recreate); |
| } |
| |
| /** |
| * Mark that index.bin recreating in progress. |
| * @param cacheCtx Cache context. |
| */ |
| public void markIndexRecreate(GridCacheContext cacheCtx) { |
| idxBuildStatusStorage.markIndexRecreate(cacheCtx); |
| } |
| |
| /** |
| * Callback on finish of rebuild cache indexes. |
| * <p/> |
| * If the cache is persistent, then we mark that the rebuilding of the |
| * indexes is completed and the entry will be deleted from the MetaStorage |
| * at the end of the checkpoint. Otherwise, delete the index rebuild entry. |
| * |
| * @param cacheCtx Cache context. |
| */ |
| public void onFinishRebuildIndexes(GridCacheContext cacheCtx) { |
| idxBuildStatusStorage.onFinishRebuildIndexes(cacheCtx.name()); |
| } |
| |
| /** |
| * Check if rebuilding of indexes for the cache has been completed. |
| * |
| * @param cacheCtx Cache context. |
| * @return {@code True} if completed. |
| */ |
| public boolean rebuildIndexesCompleted(GridCacheContext cacheCtx) { |
| return idxBuildStatusStorage.rebuildCompleted(cacheCtx.name()); |
| } |
| |
| /** |
| * @param cacheName Cache name. |
| * @return {@code True} if index.bin recreating completed. |
| */ |
| public boolean recreateCompleted(String cacheName) { |
| return idxBuildStatusStorage.recreateCompleted(cacheName); |
| } |
| |
| /** |
| * Force a mark that the index rebuild for the cache has completed. |
| * <p/> |
| * If the cache is persistent, then we mark that the rebuilding of the |
| * indexes is completed and the entry will be deleted from the MetaStorage |
| * at the end of the checkpoint. Otherwise, delete the index rebuild entry. |
| * |
| * @param cacheName Cache name. |
| */ |
| public void completeRebuildIndexes(String cacheName) { |
| idxBuildStatusStorage.onFinishRebuildIndexes(cacheName); |
| } |
| |
| /** |
| * @return Index build status storage. |
| */ |
| public IndexBuildStatusStorage getIdxBuildStatusStorage() { |
| return idxBuildStatusStorage; |
| } |
| |
| /** |
| * @return Schema manager. |
| */ |
| public SchemaManager schemaManager() { |
| return schemaMgr; |
| } |
| |
| /** @return Statistics manager. */ |
| public IgniteStatisticsManager statsManager() { |
| return statsMgr; |
| } |
| } |