blob: e6f199bf05f8618930190b8e96ffd26ca243bb14 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cdc;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static java.util.Collections.singletonMap;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
/**
* Cache conflict operations test.
*/
@RunWith(Parameterized.class)
public class CacheConflictOperationsTest extends GridCommonAbstractTest {
/** Cache mode. */
@Parameterized.Parameter
public CacheAtomicityMode cacheMode;
/** Other cluster id. */
@Parameterized.Parameter(1)
public byte otherClusterId;
/** @return Test parameters. */
@Parameterized.Parameters(name = "cacheMode={0}, otherClusterId={1}")
public static Collection<?> parameters() {
List<Object[]> params = new ArrayList<>();
for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
for (byte otherClusterId : new byte[] {FIRST_CLUSTER_ID, THIRD_CLUSTER_ID})
params.add(new Object[] {mode, otherClusterId});
return params;
}
/** */
private static IgniteCache<String, ConflictResolvableTestData> cache;
/** */
private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
/** */
private static IgniteEx client;
/** */
private static final byte FIRST_CLUSTER_ID = 1;
/** */
private static final byte SECOND_CLUSTER_ID = 2;
/** */
private static final byte THIRD_CLUSTER_ID = 3;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
CacheVersionConflictResolverPluginProvider<?> pluginCfg = new CacheVersionConflictResolverPluginProvider<>();
pluginCfg.setClusterId(SECOND_CLUSTER_ID);
pluginCfg.setCaches(new HashSet<>(Collections.singleton(DEFAULT_CACHE_NAME)));
pluginCfg.setConflictResolveField(conflictResolveField());
return super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
startGrid(1);
client = startClientGrid(2);
cache = client.createCache(new CacheConfiguration<String, ConflictResolvableTestData>(DEFAULT_CACHE_NAME).setAtomicityMode(cacheMode));
cachex = client.cachex(DEFAULT_CACHE_NAME);
}
/** Tests that regular cache operations works with the conflict resolver when there is no update conflicts. */
@Test
public void testSimpleUpdates() {
String key = "UpdatesWithoutConflict";
put(key);
put(key);
remove(key);
}
/**
* Tests that {@code IgniteInternalCache#*AllConflict} cache operations works with the conflict resolver
* when there is no update conflicts.
*/
@Test
public void testUpdatesFromOtherClusterWithoutConflict() throws Exception {
String key = key("UpdateFromOtherClusterWithoutConflict", otherClusterId);
putConflict(key, 1, true);
putConflict(key, 2, true);
removeConflict(key, 3, true);
}
/**
* Tests that {@code IgniteInternalCache#*AllConflict} cache operations works with the conflict resolver
* when there are update conflicts.
*/
@Test
public void testUpdatesReorderFromOtherCluster() throws Exception {
String key = key("UpdateClusterUpdateReorder", otherClusterId);
putConflict(key, 2, true);
// Update with the equal or lower order should ignored.
putConflict(key, 2, false);
putConflict(key, 1, false);
// Remove with the equal or lower order should ignored.
removeConflict(key, 2, false);
removeConflict(key, 1, false);
// Remove with the higher order should succeed.
putConflict(key, 3, true);
key = key("UpdateClusterUpdateReorder2", otherClusterId);
int order = 1;
putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), true);
// Update with the equal or lower topVer should ignored.
putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), false);
putConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), false);
// Remove with the equal or lower topVer should ignored.
removeConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), false);
removeConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), false);
// Remove with the higher topVer should succeed.
putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), true);
key = key("UpdateClusterUpdateReorder3", otherClusterId);
int topVer = 1;
putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), true);
// Update with the equal or lower nodeOrder should ignored.
putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false);
putConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false);
// Remove with the equal or lower nodeOrder should ignored.
removeConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false);
removeConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false);
// Remove with the higher nodeOrder should succeed.
putConflict(key, new GridCacheVersion(topVer, order, 3, otherClusterId), true);
}
/** Tests cache operations for entry replicated from another cluster. */
@Test
public void testUpdatesConflict() throws Exception {
String key = key("UpdateThisClusterConflict0", otherClusterId);
putConflict(key, 1, true);
// Local remove for other cluster entry should succeed.
remove(key);
// Conflict replicated update should ignored.
// Resolve by field value not applicable because after remove operation "old" value doesn't exists.
putConflict(key, 2, false);
key = key("UpdateThisDCConflict1", otherClusterId);
putConflict(key, 3, true);
// Local update for other cluster entry should succeed.
put(key);
key = key("UpdateThisDCConflict2", otherClusterId);
put(key);
// Conflict replicated remove should ignored.
removeConflict(key, 4, false);
key = key("UpdateThisDCConflict3", otherClusterId);
put(key);
// Conflict replicated update succeed only if resolved by field.
putConflict(key, 5, conflictResolveField() != null);
}
/** */
private void put(String key) {
ConflictResolvableTestData newVal = ConflictResolvableTestData.create();
CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(key);
cache.put(key, newVal);
CacheEntry<String, ConflictResolvableTestData> newEntry = cache.getEntry(key);
assertNull(((CacheEntryVersion)newEntry.version()).otherClusterVersion());
assertEquals(newVal, cache.get(key));
if (oldEntry != null)
assertTrue(((CacheEntryVersion)oldEntry.version()).order() < ((CacheEntryVersion)newEntry.version()).order());
}
/** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
private void putConflict(String k, long order, boolean success) throws IgniteCheckedException {
putConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success);
}
/** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
private void putConflict(String k, GridCacheVersion newVer, boolean success) throws IgniteCheckedException {
CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(k);
ConflictResolvableTestData newVal = ConflictResolvableTestData.create();
KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k));
CacheObject val = new CacheObjectImpl(client.binary().toBinary(newVal), null);
cachex.putAllConflict(singletonMap(key, new GridCacheDrInfo(val, newVer)));
if (success) {
assertEquals(newVer, ((CacheEntryVersion)cache.getEntry(k).version()).otherClusterVersion());
assertEquals(newVal, cache.get(k));
} else if (oldEntry != null) {
assertEquals(oldEntry.getValue(), cache.get(k));
assertEquals(oldEntry.version(), cache.getEntry(k).version());
}
}
/** */
private void remove(String key) {
assertTrue(cache.containsKey(key));
cache.remove(key);
assertFalse(cache.containsKey(key));
}
/** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. */
private void removeConflict(String k, long order, boolean success) throws IgniteCheckedException {
removeConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success);
}
/** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. */
private void removeConflict(String k, GridCacheVersion ver, boolean success) throws IgniteCheckedException {
assertTrue(cache.containsKey(k));
CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(k);
KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k));
cachex.removeAllConflict(singletonMap(key, ver));
if (success)
assertFalse(cache.containsKey(k));
else if (oldEntry != null) {
assertEquals(oldEntry.getValue(), cache.get(k));
assertEquals(oldEntry.version(), cache.getEntry(k).version());
}
}
/** */
private String key(String key, byte otherClusterId) {
return key + otherClusterId + cacheMode;
}
/** */
protected String conflictResolveField() {
return null;
}
}