blob: 85315e2812081c3e70fcc81a4cdffb79d1cb64a1 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.cdc.conflictresolve;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
* This class implements simple conflict resolution algorithm.
* Algorithm decides which version of the entry should be used "new" or "old".
* The following steps performed:
* <ul>
* <li>If entry is freshly created then new version used - {@link GridCacheVersionedEntryEx#isStartVersion()}.</li>
* <li>If change made in this cluster then new version used - {@link GridCacheVersionedEntryEx#dataCenterId()}.</li>
* <li>If cluster of new entry equal to cluster of old entry
* then entry with the greater {@link GridCacheVersionedEntryEx#order()} used.</li>
* <li>If {@link #conflictResolveField} provided and field of new entry greater then new version used.</li>
* <li>If {@link #conflictResolveField} provided and field of old entry greater then old version used.</li>
* <li>Conflict can't be resolved. Update ignored. Old version used.</li>
* </ul>
public class CacheVersionConflictResolverImpl implements CacheVersionConflictResolver {
* Cluster id.
protected final byte clusterId;
* Field for conflict resolve.
* Value of this field will be used to compare two entries in case of conflicting changes.
* values of this field must implement {@link Comparable} interface.
* <pre><i>Note, value of this field used to resolve conflict for external updates only.</i>
* @see CacheVersionConflictResolverImpl
private final String conflictResolveField;
/** Logger. */
protected final IgniteLogger log;
/** If {@code true} then conflict resolving with the value field enabled. */
protected final boolean conflictResolveFieldEnabled;
* @param clusterId Data center id.
* @param conflictResolveField Field to resolve conflicts.
* @param log Logger.
public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveField, IgniteLogger log) {
this.clusterId = clusterId;
this.conflictResolveField = conflictResolveField;
this.log = log;
conflictResolveFieldEnabled = conflictResolveField != null;
/** {@inheritDoc} */
@Override public <K, V> GridCacheVersionConflictContext<K, V> resolve(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry,
boolean atomicVerComparator
) {
GridCacheVersionConflictContext<K, V> res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
boolean useNew = isUseNew(ctx, oldEntry, newEntry);
if (useNew)
return res;
* @param ctx Context.
* @param oldEntry Old entry.
* @param newEntry New entry.
* @param <K> Key type.
* @param <V> Key type.
* @return {@code True} is should use new entry.
@SuppressWarnings({"unchecked", "rawtypes"})
protected <K, V> boolean isUseNew(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry
) {
if (newEntry.dataCenterId() == clusterId) // Update made on the local cluster always win.
return true;
if (oldEntry.isStartVersion()) // Entry absent (new entry).
return true;
if (oldEntry.dataCenterId() == newEntry.dataCenterId()) {
int cmp = newEntry.version().compareTo(oldEntry.version());
// Ignite sets the expire time to zero on backups for transaction caches.
// If CDC is running in onlyPrimary=false mode, then the updates from backups may be applied first.
// In this case, a new entry from the primary node should be used to set the expiration time.
// See GridDistributedTxRemoteAdapter#commitIfLocked
if (cmp == 0)
return newEntry.expireTime() > oldEntry.expireTime();
return cmp > 0; // New version from the same cluster.
if (conflictResolveFieldEnabled) {
Object oldVal = oldEntry.value(ctx);
Object newVal = newEntry.value(ctx);
if (oldVal != null && newVal != null) {
try {
return value(oldVal).compareTo(value(newVal)) < 0;
catch (Exception e) {
"Error while resolving replication conflict. [field=" + conflictResolveField + ", key=" + newEntry.key() + ']',
log.error("Conflict can't be resolved, " + (newEntry.value(ctx) == null ? "remove" : "update") + " ignored " +
"[key=" + newEntry.key() + ", fromCluster=" + newEntry.dataCenterId() + ", toCluster=" + oldEntry.dataCenterId() + ']');
// Ignoring update.
return false;
/** @return Conflict resolve field value. */
protected Comparable value(Object val) {
return (val instanceof BinaryObject)
? ((BinaryObject)val).field(conflictResolveField)
: U.field(val, conflictResolveField);
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheVersionConflictResolverImpl.class, this);