blob: f475efa48f4c56795a918a77a7713ba11a7f217e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
/**
* Cache query lock future.
*/
public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture {
/** Involved cache ids. */
private final int[] cacheIds;
/** Schema name. */
private final String schema;
/** Query string. */
private final String qry;
/** Query partitions. */
private final int[] parts;
/** Query parameters. */
private final Object[] params;
/** Flags. */
private final int flags;
/** Fetch page size. */
private final int pageSize;
/**
* @param nearNodeId Near node ID.
* @param nearLockVer Near lock version.
* @param mvccSnapshot Mvcc snapshot.
* @param threadId Thread ID.
* @param nearFutId Near future id.
* @param nearMiniId Near mini future id.
* @param tx Transaction.
* @param cacheIds Involved cache ids.
* @param parts Partitions.
* @param schema Schema name.
* @param qry Query string.
* @param params Query parameters.
* @param flags Flags.
* @param pageSize Fetch page size.
* @param timeout Lock acquisition timeout.
* @param cctx Cache context.
*/
public GridDhtTxQueryEnlistFuture(
UUID nearNodeId,
GridCacheVersion nearLockVer,
MvccSnapshot mvccSnapshot,
long threadId,
IgniteUuid nearFutId,
int nearMiniId,
GridDhtTxLocalAdapter tx,
int[] cacheIds,
int[] parts,
String schema,
String qry,
Object[] params,
int flags,
int pageSize,
long timeout,
GridCacheContext<?, ?> cctx) {
super(nearNodeId,
nearLockVer,
mvccSnapshot,
threadId,
nearFutId,
nearMiniId,
tx,
timeout,
cctx);
assert timeout >= 0;
assert nearNodeId != null;
assert nearLockVer != null;
assert threadId == tx.threadId();
this.cacheIds = cacheIds;
this.schema = schema;
this.qry = qry;
this.params = params;
this.flags = flags;
this.pageSize = pageSize;
this.parts = calculatePartitions(tx, parts, cctx);
}
/** {@inheritDoc} */
@Override protected UpdateSourceIterator<?> createIterator() throws IgniteCheckedException {
checkPartitions(parts);
return cctx.kernalContext().query().executeUpdateOnDataNodeTransactional(
cctx,
cacheIds,
parts,
schema,
qry,
params,
flags,
pageSize,
(int)timeout,
tx.topologyVersionSnapshot(),
mvccSnapshot,
new GridQueryCancel()
);
}
/**
* Checks whether all the necessary partitions are in {@link GridDhtPartitionState#OWNING} state.
*
* @param parts Partitions.
* @throws ClusterTopologyCheckedException If failed.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
private void checkPartitions(int[] parts) throws ClusterTopologyCheckedException {
if (!cctx.rebalanceEnabled())
return;
GridDhtPartitionTopology top = cctx.topology();
try {
top.readLock();
for (int i = 0; i < parts.length; i++) {
GridDhtLocalPartition p = top.localPartition(parts[i]);
if (p == null || p.state() != GridDhtPartitionState.OWNING) {
throw new ClusterTopologyCheckedException("Cannot run update query. " +
"Node must own all the necessary partitions.");
}
}
}
finally {
top.readUnlock();
}
}
/** */
private int[] calculatePartitions(GridDhtTxLocalAdapter tx, int[] parts, GridCacheContext<?, ?> cctx) {
if (parts == null)
parts = U.toIntArray(
cctx.affinity()
.primaryPartitions(cctx.localNodeId(), tx.topologyVersionSnapshot()));
return parts;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtTxQueryEnlistFuture.class, this);
}
}