blob: 5e986d0775fd2a8bd80b206524a20f8e263b06e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2.twostep;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
/**
* Class responsible for partition reservation for queries executed on local node. Prevents partitions from being
* evicted from node during query execution.
*/
public class PartitionReservationManager implements PartitionsExchangeAware {
/** Special instance of reservable object for REPLICATED caches. */
private static final ReplicatedReservable REPLICATED_RESERVABLE = new ReplicatedReservable();
/** Kernal context. */
private final GridKernalContext ctx;
/**
* Group reservations cache. When affinity version is not changed and all primary partitions must be reserved we get
* group reservation from this map instead of create new reservation group.
*/
private final ConcurrentMap<PartitionReservationKey, GridReservable> reservations = new ConcurrentHashMap<>();
/** Logger. */
private final IgniteLogger log;
/**
* Constructor.
*
* @param ctx Context.
*/
public PartitionReservationManager(GridKernalContext ctx) {
this.ctx = ctx;
log = ctx.log(PartitionReservationManager.class);
ctx.cache().context().exchange().registerExchangeAwareComponent(this);
}
/**
* Decide whether to ignore or proceed with lost partition.
*
* @param cctx Cache context.
* @param part Partition.
* @throws IgniteCheckedException If failed.
*/
private static void ignoreLostPartitionIfPossible(GridCacheContext cctx, GridDhtLocalPartition part)
throws IgniteCheckedException {
PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy();
if (plc != null) {
if (plc == READ_ONLY_SAFE || plc == READ_WRITE_SAFE) {
throw new CacheInvalidStateException("Failed to execute query because cache partition has been " +
"lost [cacheName=" + cctx.name() + ", part=" + part + ']');
}
}
}
/**
* @param cctx Cache context.
* @param p Partition ID.
* @return Partition.
*/
private static GridDhtLocalPartition partition(GridCacheContext<?, ?> cctx, int p) {
return cctx.topology().localPartition(p, NONE, false);
}
/**
* @param cacheIds Cache IDs.
* @param reqTopVer Topology version from request.
* @param explicitParts Explicit partitions list.
* @param nodeId Node ID.
* @param reqId Request ID.
* @return String which is null in case of success or with causeMessage if failed
* @throws IgniteCheckedException If failed.
*/
public PartitionReservation reservePartitions(
@Nullable List<Integer> cacheIds,
AffinityTopologyVersion reqTopVer,
final int[] explicitParts,
UUID nodeId,
long reqId
) throws IgniteCheckedException {
assert reqTopVer != null;
AffinityTopologyVersion topVer = ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer);
if (F.isEmpty(cacheIds))
return new PartitionReservation(Collections.emptyList());
Collection<Integer> partIds;
if (explicitParts == null)
partIds = null;
else if (explicitParts.length == 0)
partIds = Collections.emptyList();
else {
partIds = new ArrayList<>(explicitParts.length);
for (int explicitPart : explicitParts)
partIds.add(explicitPart);
}
List<GridReservable> reserved = new ArrayList<>();
for (int i = 0; i < cacheIds.size(); i++) {
GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(cacheIds.get(i));
// Cache was not found, probably was not deployed yet.
if (cctx == null) {
return new PartitionReservation(reserved,
String.format("Failed to reserve partitions for query (cache is not " +
"found on local node) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]",
ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i)));
}
if (cctx.isLocal() || !cctx.rebalanceEnabled())
continue;
// For replicated cache topology version does not make sense.
final PartitionReservationKey grpKey = new PartitionReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer);
GridReservable r = reservations.get(grpKey);
if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits.
if (r != REPLICATED_RESERVABLE) {
if (!r.reserve())
return new PartitionReservation(reserved,
String.format("Failed to reserve partitions for query (group " +
"reservation failed) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
"cacheName=%s]", ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i), cctx.name()));
reserved.add(r);
}
}
else { // Try to reserve partitions one by one.
int partsCnt = cctx.affinity().partitions();
if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache.
if (r == null) { // Check only once.
for (int p = 0; p < partsCnt; p++) {
GridDhtLocalPartition part = partition(cctx, p);
// We don't need to reserve partitions because they will not be evicted in replicated caches.
GridDhtPartitionState partState = part != null ? part.state() : null;
if (partState != OWNING)
return new PartitionReservation(reserved,
String.format("Failed to reserve partitions for " +
"query (partition of REPLICATED cache is not in OWNING state) [" +
"localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
"cacheName=%s, part=%s, partFound=%s, partState=%s]",
ctx.localNodeId(),
nodeId,
reqId,
topVer,
cacheIds.get(i),
cctx.name(),
p,
(part != null),
partState
));
}
// Mark that we checked this replicated cache.
reservations.putIfAbsent(grpKey, REPLICATED_RESERVABLE);
}
}
else { // Reserve primary partitions for partitioned cache (if no explicit given).
if (explicitParts == null)
partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
int reservedCnt = 0;
for (int partId : partIds) {
GridDhtLocalPartition part = partition(cctx, partId);
GridDhtPartitionState partState = part != null ? part.state() : null;
if (partState != OWNING) {
if (partState == LOST)
ignoreLostPartitionIfPossible(cctx, part);
else {
return new PartitionReservation(reserved,
String.format("Failed to reserve partitions " +
"for query (partition of PARTITIONED cache is not found or not in OWNING " +
"state) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
"cacheName=%s, part=%s, partFound=%s, partState=%s]",
ctx.localNodeId(),
nodeId,
reqId,
topVer,
cacheIds.get(i),
cctx.name(),
partId,
(part != null),
partState
));
}
}
if (!part.reserve()) {
return new PartitionReservation(reserved,
String.format("Failed to reserve partitions for query " +
"(partition of PARTITIONED cache cannot be reserved) [" +
"localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
"cacheName=%s, part=%s, partFound=%s, partState=%s]",
ctx.localNodeId(),
nodeId,
reqId,
topVer,
cacheIds.get(i),
cctx.name(),
partId,
true,
partState
));
}
reserved.add(part);
reservedCnt++;
// Double check that we are still in owning state and partition contents are not cleared.
partState = part.state();
if (partState != OWNING) {
if (partState == LOST)
ignoreLostPartitionIfPossible(cctx, part);
else {
return new PartitionReservation(reserved,
String.format("Failed to reserve partitions for " +
"query (partition of PARTITIONED cache is not in OWNING state after " +
"reservation) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, " +
"cacheId=%s, cacheName=%s, part=%s, partState=%s]",
ctx.localNodeId(),
nodeId,
reqId,
topVer,
cacheIds.get(i),
cctx.name(),
partId,
partState
));
}
}
}
if (explicitParts == null && reservedCnt > 0) {
// We reserved all the primary partitions for cache, attempt to add group reservation.
GridDhtPartitionsReservation grp = new GridDhtPartitionsReservation(topVer, cctx, "SQL");
if (grp.register(reserved.subList(reserved.size() - reservedCnt, reserved.size()))) {
if (reservations.putIfAbsent(grpKey, grp) != null)
throw new IllegalStateException("Reservation already exists.");
grp.onPublish(new CI1<GridDhtPartitionsReservation>() {
@Override public void apply(GridDhtPartitionsReservation r) {
reservations.remove(grpKey, r);
}
});
}
}
}
}
}
return new PartitionReservation(reserved);
}
/**
* @param cacheName Cache name.
*/
public void onCacheStop(String cacheName) {
// Drop group reservations.
for (PartitionReservationKey grpKey : reservations.keySet()) {
if (F.eq(grpKey.cacheName(), cacheName))
reservations.remove(grpKey);
}
}
/**
* Cleanup group reservations cache on change affinity version.
*/
@Override public void onDoneAfterTopologyUnlock(final GridDhtPartitionsExchangeFuture fut) {
try {
// Must not do anything at the exchange thread. Dispatch to the management thread pool.
ctx.closure().runLocal(
new GridPlainRunnable() {
@Override public void run() {
AffinityTopologyVersion topVer = ctx.cache().context().exchange()
.lastAffinityChangedTopologyVersion(fut.topologyVersion());
reservations.forEach((key, r) -> {
if (r != REPLICATED_RESERVABLE && !F.eq(key.topologyVersion(), topVer)) {
assert r instanceof GridDhtPartitionsReservation;
((GridDhtPartitionsReservation)r).invalidate();
}
});
}
},
GridIoPolicy.MANAGEMENT_POOL);
}
catch (Throwable e) {
log.error("Unexpected exception on start reservations cleanup", e);
}
}
/**
* Mapper fake reservation object for replicated caches.
*/
private static class ReplicatedReservable implements GridReservable {
/** {@inheritDoc} */
@Override public boolean reserve() {
throw new IllegalStateException();
}
/** {@inheritDoc} */
@Override public void release() {
throw new IllegalStateException();
}
}
}