| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht; |
| |
| import java.util.UUID; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.processors.query.GridQueryCancel; |
| import org.apache.ignite.internal.processors.query.UpdateSourceIterator; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteUuid; |
| |
| /** |
| * Cache query lock future. |
| */ |
| public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture { |
| /** Involved cache ids. */ |
| private final int[] cacheIds; |
| |
| /** Schema name. */ |
| private final String schema; |
| |
| /** Query string. */ |
| private final String qry; |
| |
| /** Query partitions. */ |
| private final int[] parts; |
| |
| /** Query parameters. */ |
| private final Object[] params; |
| |
| /** Flags. */ |
| private final int flags; |
| |
| /** Fetch page size. */ |
| private final int pageSize; |
| |
| /** |
| * @param nearNodeId Near node ID. |
| * @param nearLockVer Near lock version. |
| * @param mvccSnapshot Mvcc snapshot. |
| * @param threadId Thread ID. |
| * @param nearFutId Near future id. |
| * @param nearMiniId Near mini future id. |
| * @param tx Transaction. |
| * @param cacheIds Involved cache ids. |
| * @param parts Partitions. |
| * @param schema Schema name. |
| * @param qry Query string. |
| * @param params Query parameters. |
| * @param flags Flags. |
| * @param pageSize Fetch page size. |
| * @param timeout Lock acquisition timeout. |
| * @param cctx Cache context. |
| */ |
| public GridDhtTxQueryEnlistFuture( |
| UUID nearNodeId, |
| GridCacheVersion nearLockVer, |
| MvccSnapshot mvccSnapshot, |
| long threadId, |
| IgniteUuid nearFutId, |
| int nearMiniId, |
| GridDhtTxLocalAdapter tx, |
| int[] cacheIds, |
| int[] parts, |
| String schema, |
| String qry, |
| Object[] params, |
| int flags, |
| int pageSize, |
| long timeout, |
| GridCacheContext<?, ?> cctx) { |
| super(nearNodeId, |
| nearLockVer, |
| mvccSnapshot, |
| threadId, |
| nearFutId, |
| nearMiniId, |
| tx, |
| timeout, |
| cctx); |
| |
| assert timeout >= 0; |
| assert nearNodeId != null; |
| assert nearLockVer != null; |
| assert threadId == tx.threadId(); |
| |
| this.cacheIds = cacheIds; |
| this.schema = schema; |
| this.qry = qry; |
| this.params = params; |
| this.flags = flags; |
| this.pageSize = pageSize; |
| |
| this.parts = calculatePartitions(tx, parts, cctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected UpdateSourceIterator<?> createIterator() throws IgniteCheckedException { |
| checkPartitions(parts); |
| |
| return cctx.kernalContext().query().executeUpdateOnDataNodeTransactional( |
| cctx, |
| cacheIds, |
| parts, |
| schema, |
| qry, |
| params, |
| flags, |
| pageSize, |
| (int)timeout, |
| tx.topologyVersionSnapshot(), |
| mvccSnapshot, |
| new GridQueryCancel() |
| ); |
| } |
| |
| /** |
| * Checks whether all the necessary partitions are in {@link GridDhtPartitionState#OWNING} state. |
| * |
| * @param parts Partitions. |
| * @throws ClusterTopologyCheckedException If failed. |
| */ |
| @SuppressWarnings("ForLoopReplaceableByForEach") |
| private void checkPartitions(int[] parts) throws ClusterTopologyCheckedException { |
| if (cctx.isLocal() || !cctx.rebalanceEnabled()) |
| return; |
| |
| GridDhtPartitionTopology top = cctx.topology(); |
| |
| try { |
| top.readLock(); |
| |
| for (int i = 0; i < parts.length; i++) { |
| GridDhtLocalPartition p = top.localPartition(parts[i]); |
| |
| if (p == null || p.state() != GridDhtPartitionState.OWNING) { |
| throw new ClusterTopologyCheckedException("Cannot run update query. " + |
| "Node must own all the necessary partitions."); |
| } |
| } |
| } |
| finally { |
| top.readUnlock(); |
| } |
| } |
| |
| /** */ |
| private int[] calculatePartitions(GridDhtTxLocalAdapter tx, int[] parts, GridCacheContext<?, ?> cctx) { |
| if (parts == null) |
| parts = U.toIntArray( |
| cctx.affinity() |
| .primaryPartitions(cctx.localNodeId(), tx.topologyVersionSnapshot())); |
| |
| return parts; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(GridDhtTxQueryEnlistFuture.class, this); |
| } |
| } |