blob: 89a23005f5989f3f8783fcb41b26d2955e7a14fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.mvcc;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.NotNull;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.noCoordinatorError;
/**
* Tracker used for an optimistic tx and not-in-tx queries.
*/
@SuppressWarnings("unchecked")
public class MvccQueryTrackerImpl implements MvccQueryTracker {
/** */
@GridToStringExclude
private final GridCacheContext cctx;
/** */
@GridToStringExclude
private final IgniteLogger log;
/** */
private long crdVer;
/** */
private final long id;
/** */
private Object state;
/** */
private volatile AffinityTopologyVersion topVer;
/** */
private boolean done;
/**
* @param cctx Cache context.
*/
public MvccQueryTrackerImpl(GridCacheContext cctx) {
this.cctx = cctx;
this.id = ID_CNTR.incrementAndGet();
log = cctx.logger(getClass());
}
/** {@inheritDoc} */
@Override public long id() {
return id;
}
/** {@inheritDoc} */
@Override public MvccSnapshot snapshot() {
Object state0;
synchronized (this) {
state0 = state;
}
return snapshot(state0);
}
/** */
private MvccSnapshot snapshot(Object state) {
if (state != null && state.getClass() == SnapshotFuture.class)
return ((SnapshotFuture)state).result();
else
return (MvccSnapshot)state;
}
/** {@inheritDoc} */
@Override public GridCacheContext context() {
return cctx;
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
return topVer;
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<MvccSnapshot> requestSnapshot() {
SnapshotFuture fut;
synchronized (this) {
if (state == null)
state = fut = new SnapshotFuture();
else if (state.getClass() == SnapshotFuture.class)
return (IgniteInternalFuture<MvccSnapshot>)state;
else
return new GridFinishedFuture<>((MvccSnapshot)state);
}
requestSnapshot0(cctx.shared().exchange().readyAffinityVersion(), fut);
return fut;
}
/** {@inheritDoc} */
@Override public void onDone() {
Object state0;
synchronized (this) {
if (done)
return;
state0 = state;
done = true;
}
cctx.shared().coordinators().removeQueryTracker(id);
if (state0 != null && state0.getClass() == SnapshotFuture.class)
((SnapshotFuture)state0).cancel();
else
ackQueryDone((MvccSnapshot)state0);
}
/** {@inheritDoc} */
@Override public synchronized long onMvccCoordinatorChange(@NotNull MvccCoordinator newCrd) {
if (snapshot(state) != null) {
assert crdVer != 0 : this;
if (crdVer != newCrd.version()) {
crdVer = newCrd.version();
return id;
}
else
return MVCC_TRACKER_ID_NA;
}
else if (crdVer != 0)
crdVer = 0; // Mark for remap.
return MVCC_TRACKER_ID_NA;
}
/** */
private void requestSnapshot0(@NotNull AffinityTopologyVersion topVer, @NotNull MvccSnapshotResponseListener lsnr) {
MvccCoordinator crd = coordinator(); AffinityTopologyVersion crdTopVer = crd.topologyVersion();
if (!crdTopVer.initialized())
lsnr.onError(noCoordinatorError());
else if (crdTopVer.compareTo(topVer) <= 0) {
synchronized (this) {
if (done)
return;
this.crdVer = crd.version();
}
this.topVer = topVer;
cctx.shared().coordinators().addQueryTracker(this);
cctx.shared().coordinators().requestReadSnapshotAsync(crd, lsnr);
}
else
remap(crdTopVer, lsnr);
}
/**
* @param res Response.
* @param lsnr Response listener.
* @return {@code false} if need to remap.
*/
private boolean onResponse0(@NotNull MvccSnapshot res, @NotNull MvccSnapshotResponseListener lsnr) {
boolean ackQueryDone = false, needRemap = false;
synchronized (this) {
assert snapshot(state) == null : "[this=" + this + ", rcvdVer=" + res + "]";
if (!done && crdVer != 0) {
this.state = res;
return true;
}
if (crdVer != 0)
ackQueryDone = true;
else if (!done)
needRemap = true;
}
if (needRemap) // Coordinator is failed or reassigned, need remap.
tryRemap(coordinator().topologyVersion(), lsnr);
else if (ackQueryDone) // Coordinator is not failed, but the tracker is already closed.
ackQueryDone(res);
return false;
}
/**
* @param e Exception.
* @param lsnr Response listener.
* @return {@code false} if need to remap.
*/
private boolean onError0(IgniteCheckedException e, @NotNull MvccSnapshotResponseListener lsnr) {
synchronized (this) {
if (done)
return false;
}
if (e instanceof ClusterTopologyCheckedException
&& !(e instanceof ClusterTopologyServerNotFoundException)) {
tryRemap(coordinator().topologyVersion(), lsnr);
return false;
}
cctx.shared().coordinators().removeQueryTracker(id);
return true;
}
/** */
private void tryRemap(@NotNull AffinityTopologyVersion mapVer, @NotNull MvccSnapshotResponseListener lsnr) {
if (!mapVer.initialized())
lsnr.onError(noCoordinatorError());
else
remap(mapVer, lsnr);
}
/** */
private void remap(@NotNull AffinityTopologyVersion mapVer, @NotNull MvccSnapshotResponseListener lsnr) {
if (log.isDebugEnabled())
log.debug("Mvcc coordinator failed or reassigned, need remap.");
AffinityTopologyVersion topVer0 = topVer;
if (topVer0 != null && topVer0.compareTo(mapVer) >= 0)
mapVer = topVer0.nextMinorVersion();
// Topology version can grow only.
assert topVer0 == null || mapVer.compareTo(topVer0) > 0 :
"topVer=" + topVer0 + ", nextTopVer=" + mapVer;
IgniteInternalFuture<AffinityTopologyVersion> readyFut =
cctx.shared().exchange().affinityReadyFuture(mapVer);
assert readyFut != null; // Cannot be null.
if (readyFut.isDone())
onAffinityReady(readyFut, lsnr);
else
readyFut.listen(fut -> onAffinityReady(fut, lsnr));
}
/** */
private void onAffinityReady(@NotNull IgniteInternalFuture<AffinityTopologyVersion> readyFut,
@NotNull MvccSnapshotResponseListener lsnr) {
try {
AffinityTopologyVersion mapVer = readyFut.get();
if (log.isDebugEnabled())
log.debug("Remap on new topology: " + mapVer);
requestSnapshot0(mapVer, lsnr);
}
catch (IgniteCheckedException e) {
lsnr.onError(e);
}
}
/** */
private void ackQueryDone(MvccSnapshot snapshot) {
if (snapshot != null)
cctx.shared().coordinators().ackQueryDone(snapshot, id);
}
/** */
@NotNull private MvccCoordinator coordinator() {
return cctx.shared().coordinators().currentCoordinator();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MvccQueryTrackerImpl.class, this);
}
/** */
private final class SnapshotFuture extends MvccSnapshotFuture {
/** */
private SnapshotFuture() {}
/** {@inheritDoc} */
@Override public void onResponse(MvccSnapshot res) {
if (onResponse0(res, this))
super.onResponse(res);
}
/** {@inheritDoc} */
@Override public void onError(IgniteCheckedException e) {
if (onError0(e, this))
super.onError(e);
}
/** {@inheritDoc} */
@Override public boolean cancel() {
return onCancelled();
}
}
}