blob: df8af4811b7b3b52d2be2861f77be2f09b2b3816 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.version;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
/**
* Makes sure that cache lock order values come in proper sequence.
* <p>
* NOTE: this class should not make use of any cache specific structures,
* like, for example GridCacheContext, as it may be reused between different
* caches.
*/
public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
/** */
public static final GridCacheVersion EVICT_VER = new GridCacheVersion(Integer.MAX_VALUE, 0, 0, 0);
/** */
public static final GridCacheVersion START_VER = new GridCacheVersion(0, 0, 0, 0);
/** Timestamp used as base time for cache topology version (January 1, 2014). */
public static final long TOP_VER_BASE_TIME = 1388520000000L;
/**
* Current order. Initialize to current time to make sure that
* local version increments even after restarts.
*/
private final AtomicLong order = new AtomicLong(U.currentTimeMillis());
/** Current order for store operations. */
private final AtomicLong loadOrder = new AtomicLong(0);
/** Last version. */
private volatile GridCacheVersion last;
/** Data center ID. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private byte dataCenterId;
/** */
private long gridStartTime;
/** */
private GridCacheVersion ISOLATED_STREAMER_VER;
/** */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
assert evt.type() == EVT_NODE_METRICS_UPDATED;
DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
ClusterNode node = cctx.discovery().node(discoEvt.node().id());
if (node != null && !node.id().equals(cctx.localNodeId()))
onReceived(discoEvt.eventNode().id(), node.metrics().getLastDataVersion());
}
};
/** {@inheritDoc} */
@Override public void start0() throws IgniteCheckedException {
last = new GridCacheVersion(0, order.get(), 0, dataCenterId);
cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_METRICS_UPDATED);
}
/** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
cctx.gridEvents().removeLocalEventListener(discoLsnr, EVT_NODE_METRICS_UPDATED);
}
/**
* Sets data center ID.
*
* @param dataCenterId Data center ID.
*/
public void dataCenterId(byte dataCenterId) {
this.dataCenterId = dataCenterId;
last = new GridCacheVersion(0, order.get(), 0, dataCenterId);
}
/**
* @param nodeId Node ID.
* @param ver Remote version.
*/
public void onReceived(UUID nodeId, GridCacheVersion ver) {
onReceived(nodeId, ver.order());
}
/**
* @param nodeId Node ID.
* @param ver Remote version.
*/
public void onReceived(UUID nodeId, long ver) {
if (ver > 0) {
while (true) {
long order = this.order.get();
// If another version is larger, we update.
if (ver > order) {
if (!this.order.compareAndSet(order, ver))
// Try again.
continue;
else if (log.isDebugEnabled())
log.debug("Updated version from node [nodeId=" + nodeId + ", ver=" + ver + ']');
}
else if (log.isDebugEnabled()) {
log.debug("Did not update version from node (version has lower order) [nodeId=" + nodeId +
", ver=" + ver + ", curOrder=" + this.order + ']');
}
break;
}
}
}
/**
* @param rcvOrder Received order.
*/
public void onExchange(long rcvOrder) {
long order;
while (true) {
order = this.order.get();
if (rcvOrder > order) {
if (this.order.compareAndSet(order, rcvOrder))
break;
}
else
break;
}
}
/**
* @param nodeId Node ID.
* @param ver Received version.
* @return Next version.
*/
public GridCacheVersion onReceivedAndNext(UUID nodeId, GridCacheVersion ver) {
onReceived(nodeId, ver);
return next(ver);
}
/**
* Version for entries loaded with isolated streamer, should be less than any version generated
* for entries update.
*
* @return Version for entries loaded with isolated streamer.
*/
public GridCacheVersion isolatedStreamerVersion() {
if (ISOLATED_STREAMER_VER == null) {
long topVer = 1;
if (gridStartTime == 0)
gridStartTime = cctx.kernalContext().discovery().gridStartTime();
topVer += (gridStartTime - TOP_VER_BASE_TIME) / 1000;
ISOLATED_STREAMER_VER = new GridCacheVersion((int)topVer, 0, 1, dataCenterId);
}
return ISOLATED_STREAMER_VER;
}
/**
* @return Next version based on current topology.
*/
public GridCacheVersion next() {
return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId);
}
/**
* @param dataCenterId Data center id.
* @return Next version based on current topology with given data center id.
*/
public GridCacheVersion next(byte dataCenterId) {
return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId);
}
/**
* Gets next version based on given topology version. Given value should be
* real topology version calculated as number of grid topology changes and
* obtained from discovery manager.
*
* @param topVer Topology version for which new version should be obtained.
* @return Next version based on given topology version.
*/
public GridCacheVersion next(AffinityTopologyVersion topVer) {
return next(topVer.topologyVersion(), true, false, dataCenterId);
}
/**
* Gets next version for cache store load and reload operations.
*
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad() {
return next(cctx.kernalContext().discovery().topologyVersion(), true, true, dataCenterId);
}
/**
* Gets next version for cache store load and reload operations.
*
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad(AffinityTopologyVersion topVer) {
return next(topVer.topologyVersion(), true, true, dataCenterId);
}
/**
* Gets next version for cache store load and reload operations.
*
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad(GridCacheVersion ver) {
return next(ver.topologyVersion(), false, true, dataCenterId);
}
/**
* Gets next version based on given cache version.
*
* @param ver Cache version for which new version should be obtained.
* @return Next version based on given cache version.
*/
public GridCacheVersion next(GridCacheVersion ver) {
return next(ver.topologyVersion(), false, false, dataCenterId);
}
/**
* The version is generated by taking last order plus one and random {@link UUID}.
* Such algorithm ensures that lock IDs constantly grow in value and older
* lock IDs are smaller than new ones. Therefore, older lock IDs appear
* in the pending set before newer ones, hence preventing starvation.
*
* @param topVer Topology version for which new version should be obtained.
* @param addTime If {@code true} then adds to the given topology version number of seconds
* from the start time of the first grid node.
* @param dataCenterId Data center id.
* @return New lock order.
*/
private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad, byte dataCenterId) {
if (topVer == -1)
topVer = cctx.kernalContext().discovery().topologyVersion();
if (addTime) {
if (gridStartTime == 0)
gridStartTime = cctx.kernalContext().discovery().gridStartTime();
topVer += (gridStartTime - TOP_VER_BASE_TIME) / 1000;
}
int locNodeOrder = (int)cctx.localNode().order();
long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet();
GridCacheVersion next = new GridCacheVersion(
(int)topVer,
ord,
locNodeOrder,
dataCenterId);
last = next;
return next;
}
/**
* Gets last generated version without generating a new one.
*
* @return Last generated version.
*/
public GridCacheVersion last() {
return last;
}
}