blob: a6551eaee6d3568b9ecbb50554250ed680a9fdd4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cdc.conflictresolve;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
* This class implements simple conflict resolution algorithm.
* Algorithm decides which version of the entry should be used "new" or "old".
* The following steps performed:
* <ul>
* <li>If entry is freshly created then new version used - {@link GridCacheVersionedEntryEx#isStartVersion()}.</li>
* <li>If change made in this cluster then new version used - {@link GridCacheVersionedEntryEx#dataCenterId()}.</li>
* <li>If cluster of new entry equal to cluster of old entry
* then entry with the greater {@link GridCacheVersionedEntryEx#order()} used.</li>
* <li>If {@link #conflictResolveField} provided and field of new entry greater then new version used.</li>
* <li>If {@link #conflictResolveField} provided and field of old entry greater then old version used.</li>
* <li>Conflict can't be resolved. Update ignored. Old version used.</li>
* </ul>
*/
public class CacheVersionConflictResolverImpl implements CacheVersionConflictResolver {
/**
* Cluster id.
*/
private final byte clusterId;
/**
* Field for conflict resolve.
* Value of this field will be used to compare two entries in case of conflicting changes.
* values of this field must implement {@link Comparable} interface.
* <pre><i>Note, value of this field used to resolve conflict for external updates only.</i>
*
* @see CacheVersionConflictResolverImpl
*/
private final String conflictResolveField;
/** Logger. */
private final IgniteLogger log;
/** If {@code true} then conflict resolving with the value field enabled. */
private final boolean conflictResolveFieldEnabled;
/**
* @param clusterId Data center id.
* @param conflictResolveField Field to resolve conflicts.
* @param log Logger.
*/
public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveField, IgniteLogger log) {
this.clusterId = clusterId;
this.conflictResolveField = conflictResolveField;
this.log = log;
conflictResolveFieldEnabled = conflictResolveField != null;
}
/** {@inheritDoc} */
@Override public <K, V> GridCacheVersionConflictContext<K, V> resolve(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry,
boolean atomicVerComparator
) {
GridCacheVersionConflictContext<K, V> res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
if (isUseNew(ctx, oldEntry, newEntry))
res.useNew();
else
res.useOld();
return res;
}
/**
* @param ctx Context.
* @param oldEntry Old entry.
* @param newEntry New entry.
* @param <K> Key type.
* @param <V> Key type.
* @return {@code True} is should use new entry.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private <K, V> boolean isUseNew(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry
) {
if (newEntry.dataCenterId() == clusterId) // Update made on the local cluster always win.
return true;
if (oldEntry.isStartVersion()) // Entry absent (new entry).
return true;
if (oldEntry.dataCenterId() == newEntry.dataCenterId())
return newEntry.version().compareTo(oldEntry.version()) > 0; // New version from the same cluster.
if (conflictResolveFieldEnabled) {
Object oldVal = oldEntry.value(ctx);
Object newVal = newEntry.value(ctx);
if (oldVal != null && newVal != null) {
Comparable oldResolveField;
Comparable newResolveField;
try {
if (oldVal instanceof BinaryObject) {
oldResolveField = ((BinaryObject)oldVal).field(conflictResolveField);
newResolveField = ((BinaryObject)newVal).field(conflictResolveField);
}
else {
oldResolveField = U.field(oldVal, conflictResolveField);
newResolveField = U.field(newVal, conflictResolveField);
}
return oldResolveField.compareTo(newResolveField) < 0;
}
catch (Exception e) {
log.error(
"Error while resolving replication conflict. [field=" + conflictResolveField + ", key=" + newEntry.key() + ']',
e
);
}
}
}
log.error("Conflict can't be resolved, update ignored [key=" + newEntry.key() + ", fromCluster=" + newEntry.dataCenterId()
+ ", toCluster=" + oldEntry.dataCenterId() + ']');
// Ignoring update.
return false;
}
}