blob: d522eb420c815731362d6bd39b10dfaada6359ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Reservation mechanism for multiple partitions allowing to do a reservation in one operation.
*/
public class GridDhtPartitionsReservation implements GridReservable {
/** */
private static final GridDhtLocalPartition[] EMPTY = {};
/** */
private static final CI1<GridDhtPartitionsReservation> NO_OP = new CI1<GridDhtPartitionsReservation>() {
@Override public void apply(GridDhtPartitionsReservation gridDhtPartitionsReservation) {
throw new IllegalStateException();
}
};
/** */
private final Object appKey;
/** */
private final GridCacheContext<?,?> cctx;
/** */
private final AffinityTopologyVersion topVer;
/** */
private final AtomicReference<GridDhtLocalPartition[]> parts = new AtomicReference<>();
/** */
private final AtomicReference<CI1<GridDhtPartitionsReservation>> unpublish = new AtomicReference<>();
/** */
private final AtomicInteger reservations = new AtomicInteger();
/**
* @param topVer AffinityTopologyVersion version.
* @param cctx Cache context.
* @param appKey Application key for reservation.
*/
public GridDhtPartitionsReservation(AffinityTopologyVersion topVer, GridCacheContext<?,?> cctx, Object appKey) {
assert topVer != null;
assert cctx != null;
assert appKey != null;
this.topVer = cctx.shared().exchange().lastAffinityChangedTopologyVersion(topVer);
this.cctx = cctx;
this.appKey = appKey;
}
/**
* Registers all the given partitions for this reservation.
*
* @param parts Partitions.
* @return {@code true} If registration succeeded and this reservation can be published.
*/
public boolean register(Collection<? extends GridReservable> parts) {
assert !F.isEmpty(parts) : "empty partitions list";
GridDhtLocalPartition[] arr = new GridDhtLocalPartition[parts.size()];
int i = 0;
int prevPart = -1;
boolean sorted = true; // Most probably it is a sorted list.
for (GridReservable part : parts) {
arr[i] = (GridDhtLocalPartition)part;
if (sorted) { // Make sure it will be a sorted array.
int id = arr[i].id();
if (id <= prevPart)
sorted = false;
prevPart = id;
}
i++;
}
if (!sorted)
Arrays.sort(arr);
i = 0;
prevPart = -1;
// Register in correct sort order.
for (GridDhtLocalPartition part : arr) {
if (prevPart == part.id())
throw new IllegalStateException("Duplicated partitions.");
prevPart = part.id();
if (!part.addReservation(this)) {
if (i != 0)
throw new IllegalStateException(
"Trying to reserve different sets of partitions for the same topology version.");
return false;
}
i++;
}
if (!this.parts.compareAndSet(null, arr))
throw new IllegalStateException("Partitions can be registered only once.");
assert reservations.get() != -1 : "all the partitions must be reserved before register, we can't be invalidated";
return true;
}
/**
* Must be called when this reservation is published.
*
* @param unpublish Closure to unpublish this reservation when it will become invalid.
*/
public void onPublish(CI1<GridDhtPartitionsReservation> unpublish) {
assert unpublish != null;
if (!this.unpublish.compareAndSet(null, unpublish))
throw new IllegalStateException("Unpublishing closure can be set only once.");
if (reservations.get() == -1)
unregister();
}
/**
* Reserves all the registered partitions.
*
* @return {@code true} If succeeded.
*/
@Override public boolean reserve() {
assert parts.get() != null : "partitions must be registered before the first reserve attempt";
for (;;) {
int r = reservations.get();
if (r == -1) // Invalidated.
return false;
assert r >= 0 : r;
if (reservations.compareAndSet(r, r + 1))
return true;
}
}
/**
* @param parts Partitions.
*/
private static void tryContinueClearing(GridDhtLocalPartition[] parts) {
if (parts == null) // Can be not initialized yet.
return;
for (GridDhtLocalPartition part : parts)
part.tryContinueClearing();
}
/**
* Releases all the registered partitions.
*/
@Override public void release() {
for (;;) {
int r = reservations.get();
if (r <= 0)
throw new IllegalStateException("Method 'reserve' must be called before 'release'.");
if (reservations.compareAndSet(r, r - 1)) {
// If it was the last reservation and topology version changed -> attempt to evict partitions.
if (r == 1 && !cctx.kernalContext().isStopping() &&
!topVer.equals(cctx.shared().exchange().lastAffinityChangedTopologyVersion(
cctx.topology().lastTopologyChangeVersion())))
tryContinueClearing(parts.get());
return;
}
}
}
/**
* Unregisters from all the partitions and unpublishes this reservation.
*/
private void unregister() {
GridDhtLocalPartition[] arr = parts.get();
// Unregister from partitions.
if (!F.isEmpty(arr) && parts.compareAndSet(arr, EMPTY)) {
// Reverse order makes sure that addReservation on the same topVer
// reservation will fail on the first partition.
for (int i = arr.length - 1; i >= 0; i--) {
GridDhtLocalPartition part = arr[i];
part.removeReservation(this);
}
}
// Unpublish.
CI1<GridDhtPartitionsReservation> u = unpublish.get();
if (u != null && u != NO_OP && unpublish.compareAndSet(u, NO_OP))
u.apply(this);
}
/**
* If returns {@code true} this reservation object becomes invalid and partitions
* can be evicted or at least cleared.
* Also this means that after returning {@code true} here method {@link #reserve()} can not
* return {@code true} anymore.
*
* @return {@code true} If this reservation was successfully invalidated because it was not
* reserved and partitions can be evicted.
*/
public boolean invalidate() {
assert parts.get() != null : "all parts must be reserved before registration";
int r = reservations.get();
assert r >= -1 : r;
if (r != 0)
return r == -1;
if (reservations.compareAndSet(0, -1)) {
unregister();
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
GridDhtPartitionsReservation that = (GridDhtPartitionsReservation)o;
return cctx == that.cctx && topVer.equals(that.topVer) && appKey.equals(that.appKey);
}
/** {@inheritDoc} */
@Override public int hashCode() {
String name = cctx.name();
int result = name == null ? 0 : name.hashCode();
result = 31 * result + appKey.hashCode();
result = 31 * result + topVer.hashCode();
return result;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(null,
"cache", cctx.name(), false,
"partitions", Arrays.toString(parts.get()), false,
"topology", topVer.toString(), false);
}
}