blob: 2636f8ff21c1692cce8307a50c3a5524c56287a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.version;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
import static org.apache.ignite.internal.processors.cache.CacheMetricsImpl.CACHE_METRICS;
/**
* Makes sure that cache lock order values come in proper sequence.
* <p>
* NOTE: this class should not make use of any cache specific structures,
* like, for example GridCacheContext, as it may be reused between different
* caches.
*/
public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
/** */
public static final GridCacheVersion EVICT_VER = new GridCacheVersion(Integer.MAX_VALUE, 0, 0, 0);
/** Timestamp used as base time for cache topology version (January 1, 2014). */
public static final long TOP_VER_BASE_TIME = 1388520000000L;
/** Last data version metric name. */
public static final String LAST_DATA_VER = "LastDataVersion";
/** Cluster ID metric name. */
public static final String DATA_VER_CLUSTER_ID = "DataVersionClusterId";
/** Last version metric. */
protected AtomicLongMetric lastDataVer;
/**
* Current order. Initialize to current time to make sure that
* local version increments even after restarts.
*/
private final AtomicLong order = new AtomicLong(U.currentTimeMillis());
/** Current order for store operations. */
private final AtomicLong loadOrder = new AtomicLong(0);
/** Entry start version. */
private GridCacheVersion startVer;
/** Last version. */
private volatile GridCacheVersion last;
/** Data center ID. */
private byte dataCenterId;
/**
* Offset in seconds from {@link #TOP_VER_BASE_TIME}.
* Added to version to make sure it grows after restart.
*/
private volatile int offset;
/** */
private volatile GridCacheVersion isolatedStreamerVer;
/** */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
assert evt.type() == EVT_NODE_METRICS_UPDATED;
DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
ClusterNode node = cctx.discovery().node(discoEvt.node().id());
if (node != null && !node.id().equals(cctx.localNodeId()))
onReceived(discoEvt.eventNode().id(), node.metrics().getLastDataVersion());
}
};
/** {@inheritDoc} */
@Override public void start0() throws IgniteCheckedException {
MetricRegistryImpl sysreg = cctx.kernalContext().metric().registry(CACHE_METRICS);
lastDataVer = sysreg.longMetric(LAST_DATA_VER, "The latest data version on the node.");
sysreg.register(DATA_VER_CLUSTER_ID, () -> dataCenterId, "Data version cluster id.");
startVer = new GridCacheVersion(0, 0, 0, dataCenterId);
cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_METRICS_UPDATED);
}
/** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
cctx.gridEvents().removeLocalEventListener(discoLsnr, EVT_NODE_METRICS_UPDATED);
}
/**
* @param topVer Topology version.
*/
public void onLocalJoin(long topVer) {
long startTime = cctx.kernalContext().discovery().gridStartTime();
if (startTime != 0)
offset = (int)((startTime - TOP_VER_BASE_TIME) / 1000);
last = new GridCacheVersion(0, order.get(), 0, dataCenterId);
lastDataVer.value(last.order());
isolatedStreamerVer = new GridCacheVersion(1 + offset, 0, 1, dataCenterId);
}
/**
* Sets data center ID.
*
* @param dataCenterId Data center ID.
*/
public void dataCenterId(byte dataCenterId) {
this.dataCenterId = dataCenterId;
startVer = new GridCacheVersion(0, 0, 0, dataCenterId);
}
/** @return Data center ID. */
public byte dataCenterId() {
return dataCenterId;
}
/**
* @param nodeId Node ID.
* @param ver Remote version.
*/
public void onReceived(UUID nodeId, GridCacheVersion ver) {
onReceived(nodeId, ver.order());
}
/**
* @param nodeId Node ID.
* @param ver Remote version.
*/
public void onReceived(UUID nodeId, long ver) {
if (ver > 0) {
while (true) {
long order = this.order.get();
// If another version is larger, we update.
if (ver > order) {
if (!this.order.compareAndSet(order, ver))
// Try again.
continue;
else if (log.isDebugEnabled())
log.debug("Updated version from node [nodeId=" + nodeId + ", ver=" + ver + ']');
}
else if (log.isDebugEnabled()) {
log.debug("Did not update version from node (version has lower order) [nodeId=" + nodeId +
", ver=" + ver + ", curOrder=" + this.order + ']');
}
break;
}
}
}
/**
* @param rcvOrder Received order.
*/
public void onExchange(long rcvOrder) {
long order;
while (true) {
order = this.order.get();
if (rcvOrder > order) {
if (this.order.compareAndSet(order, rcvOrder))
break;
}
else
break;
}
}
/**
* @param nodeId Node ID.
* @param ver Received version.
* @return Next version.
*/
public GridCacheVersion onReceivedAndNext(UUID nodeId, GridCacheVersion ver) {
onReceived(nodeId, ver);
return next(ver);
}
/**
* Version for entries loaded with isolated streamer, should be less than any version generated
* for entries update.
*
* @return Version for entries loaded with isolated streamer.
*/
public GridCacheVersion isolatedStreamerVersion() {
return isolatedStreamerVer;
}
/**
* Gets next version based on given topology version. Given value should be
* real topology version calculated as number of grid topology changes and
* obtained from discovery manager.
*
* @param topVer Topology version for which new version should be obtained.
* @return Next version based on given topology version.
*/
public GridCacheVersion next(long topVer) {
return next(topVer + offset, cctx.localNode().order(), dataCenterId);
}
/**
* Gets next version based on given topology version. Given value should be
* real topology version calculated as number of grid topology changes and
* obtained from discovery manager.
*
* @param topVer Topology version for which new version should be obtained.
* @param dataCenterId Data center id.
* @return Next version based on given topology version.
*/
public GridCacheVersion next(long topVer, byte dataCenterId) {
return next(topVer + offset, cctx.localNode().order(), dataCenterId);
}
/**
* Gets next version based on given cache version.
*
* @param ver Cache version for which new version should be obtained.
* @return Next version based on given cache version.
*/
public GridCacheVersion next(GridCacheVersion ver) {
return next(ver.topologyVersion(), cctx.localNode().order(), dataCenterId);
}
/**
* Gets next version for cache store load and reload operations.
*
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad() {
return nextForLoad(cctx.kernalContext().discovery().topologyVersion() + offset,
cctx.localNode().order(),
dataCenterId);
}
/**
* Gets next version for cache store load and reload operations.
*
* @param topVer Topology version for which new version should be obtained.
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad(long topVer) {
return nextForLoad(topVer + offset, cctx.localNode().order(), dataCenterId);
}
/**
* Gets next version for cache store load and reload operations.
*
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad(GridCacheVersion ver) {
return nextForLoad(ver.topologyVersion(), cctx.localNode().order(), dataCenterId);
}
/**
* @param topVer Topology version.
* @param nodeOrder Node order.
* @param dataCenterId Data center id.
*/
private GridCacheVersion next(long topVer, long nodeOrder, byte dataCenterId) {
long ord = order.incrementAndGet();
GridCacheVersion next = new GridCacheVersion(
(int)topVer,
ord,
(int)nodeOrder,
dataCenterId);
last = next;
lastDataVer.value(ord);
return next;
}
/**
* @param topVer Topology version.
* @param nodeOrder Node order.
* @param dataCenterId Data center id.
*/
private GridCacheVersion nextForLoad(long topVer, long nodeOrder, byte dataCenterId) {
long ord = loadOrder.incrementAndGet();
GridCacheVersion next = new GridCacheVersion(
(int)topVer,
ord,
(int)nodeOrder,
dataCenterId);
last = next;
lastDataVer.value(ord);
return next;
}
/**
* Gets last generated version without generating a new one.
*
* @return Last generated version.
*/
public GridCacheVersion last() {
GridCacheVersion last0 = last;
return new GridCacheVersion(last0.topologyVersion(), localOrder(), last0.nodeOrder(), last0.dataCenterId());
}
/**
* @return Local order.
*/
public long localOrder() {
return order.get();
}
/**
* Gets start version.
*
* @return Start version.
*/
public GridCacheVersion startVersion() {
assert startVer != null;
return startVer;
}
/**
* Check if given version is start version.
*
* @param ver Version.
* @return {@code True} if given version is start version.
*/
public boolean isStartVersion(GridCacheVersion ver) {
return startVer.equals(ver);
}
}