| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.cdc; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.binary.BinaryObject; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CacheEntry; |
| import org.apache.ignite.cache.CacheEntryVersion; |
| import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.CacheObjectImpl; |
| import org.apache.ignite.internal.processors.cache.IgniteInternalCache; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; |
| import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| import static java.util.Collections.singletonMap; |
| import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; |
| |
| /** |
| * Cache conflict operations test. |
| */ |
| @RunWith(Parameterized.class) |
| public class CacheConflictOperationsTest extends GridCommonAbstractTest { |
| /** Cache mode. */ |
| @Parameterized.Parameter |
| public CacheAtomicityMode cacheMode; |
| |
| /** Other cluster id. */ |
| @Parameterized.Parameter(1) |
| public byte otherClusterId; |
| |
| /** @return Test parameters. */ |
| @Parameterized.Parameters(name = "cacheMode={0}, otherClusterId={1}") |
| public static Collection<?> parameters() { |
| List<Object[]> params = new ArrayList<>(); |
| |
| for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL)) |
| for (byte otherClusterId : new byte[] {FIRST_CLUSTER_ID, THIRD_CLUSTER_ID}) |
| params.add(new Object[] {mode, otherClusterId}); |
| |
| return params; |
| } |
| |
| /** */ |
| private static IgniteCache<String, ConflictResolvableTestData> cache; |
| |
| /** */ |
| private static IgniteInternalCache<BinaryObject, BinaryObject> cachex; |
| |
| /** */ |
| private static IgniteEx client; |
| |
| /** */ |
| private static final byte FIRST_CLUSTER_ID = 1; |
| |
| /** */ |
| private static final byte SECOND_CLUSTER_ID = 2; |
| |
| /** */ |
| private static final byte THIRD_CLUSTER_ID = 3; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| CacheVersionConflictResolverPluginProvider<?> pluginCfg = new CacheVersionConflictResolverPluginProvider<>(); |
| |
| pluginCfg.setClusterId(SECOND_CLUSTER_ID); |
| pluginCfg.setCaches(new HashSet<>(Collections.singleton(DEFAULT_CACHE_NAME))); |
| pluginCfg.setConflictResolveField(conflictResolveField()); |
| |
| return super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTestsStarted() throws Exception { |
| startGrid(1); |
| |
| client = startClientGrid(2); |
| |
| cache = client.createCache(new CacheConfiguration<String, ConflictResolvableTestData>(DEFAULT_CACHE_NAME).setAtomicityMode(cacheMode)); |
| cachex = client.cachex(DEFAULT_CACHE_NAME); |
| } |
| |
| /** Tests that regular cache operations works with the conflict resolver when there is no update conflicts. */ |
| @Test |
| public void testSimpleUpdates() { |
| String key = "UpdatesWithoutConflict"; |
| |
| put(key); |
| put(key); |
| |
| remove(key); |
| } |
| |
| /** |
| * Tests that {@code IgniteInternalCache#*AllConflict} cache operations works with the conflict resolver |
| * when there is no update conflicts. |
| */ |
| @Test |
| public void testUpdatesFromOtherClusterWithoutConflict() throws Exception { |
| String key = key("UpdateFromOtherClusterWithoutConflict", otherClusterId); |
| |
| putConflict(key, 1, true); |
| |
| putConflict(key, 2, true); |
| |
| removeConflict(key, 3, true); |
| } |
| |
| /** |
| * Tests that {@code IgniteInternalCache#*AllConflict} cache operations works with the conflict resolver |
| * when there are update conflicts. |
| */ |
| @Test |
| public void testUpdatesReorderFromOtherCluster() throws Exception { |
| String key = key("UpdateClusterUpdateReorder", otherClusterId); |
| |
| putConflict(key, 2, true); |
| |
| // Update with the equal or lower order should ignored. |
| putConflict(key, 2, false); |
| putConflict(key, 1, false); |
| |
| // Remove with the equal or lower order should ignored. |
| removeConflict(key, 2, false); |
| removeConflict(key, 1, false); |
| |
| // Remove with the higher order should succeed. |
| putConflict(key, 3, true); |
| |
| key = key("UpdateClusterUpdateReorder2", otherClusterId); |
| |
| int order = 1; |
| |
| putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), true); |
| |
| // Update with the equal or lower topVer should ignored. |
| putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), false); |
| putConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), false); |
| |
| // Remove with the equal or lower topVer should ignored. |
| removeConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), false); |
| removeConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), false); |
| |
| // Remove with the higher topVer should succeed. |
| putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), true); |
| |
| key = key("UpdateClusterUpdateReorder3", otherClusterId); |
| |
| int topVer = 1; |
| |
| putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), true); |
| |
| // Update with the equal or lower nodeOrder should ignored. |
| putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false); |
| putConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false); |
| |
| // Remove with the equal or lower nodeOrder should ignored. |
| removeConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false); |
| removeConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false); |
| |
| // Remove with the higher nodeOrder should succeed. |
| putConflict(key, new GridCacheVersion(topVer, order, 3, otherClusterId), true); |
| } |
| |
| /** Tests cache operations for entry replicated from another cluster. */ |
| @Test |
| public void testUpdatesConflict() throws Exception { |
| String key = key("UpdateThisClusterConflict0", otherClusterId); |
| |
| putConflict(key, 1, true); |
| |
| // Local remove for other cluster entry should succeed. |
| remove(key); |
| |
| // Conflict replicated update should ignored. |
| // Resolve by field value not applicable because after remove operation "old" value doesn't exists. |
| putConflict(key, 2, false); |
| |
| key = key("UpdateThisDCConflict1", otherClusterId); |
| |
| putConflict(key, 3, true); |
| |
| // Local update for other cluster entry should succeed. |
| put(key); |
| |
| key = key("UpdateThisDCConflict2", otherClusterId); |
| |
| put(key); |
| |
| // Conflict replicated remove should ignored. |
| removeConflict(key, 4, false); |
| |
| key = key("UpdateThisDCConflict3", otherClusterId); |
| |
| put(key); |
| |
| // Conflict replicated update succeed only if resolved by field. |
| putConflict(key, 5, conflictResolveField() != null); |
| } |
| |
| /** */ |
| private void put(String key) { |
| ConflictResolvableTestData newVal = ConflictResolvableTestData.create(); |
| |
| CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(key); |
| |
| cache.put(key, newVal); |
| |
| CacheEntry<String, ConflictResolvableTestData> newEntry = cache.getEntry(key); |
| |
| assertNull(((CacheEntryVersion)newEntry.version()).otherClusterVersion()); |
| assertEquals(newVal, cache.get(key)); |
| |
| if (oldEntry != null) |
| assertTrue(((CacheEntryVersion)oldEntry.version()).order() < ((CacheEntryVersion)newEntry.version()).order()); |
| } |
| |
| /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */ |
| private void putConflict(String k, long order, boolean success) throws IgniteCheckedException { |
| putConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success); |
| } |
| |
| /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */ |
| private void putConflict(String k, GridCacheVersion newVer, boolean success) throws IgniteCheckedException { |
| CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(k); |
| ConflictResolvableTestData newVal = ConflictResolvableTestData.create(); |
| |
| KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k)); |
| CacheObject val = new CacheObjectImpl(client.binary().toBinary(newVal), null); |
| |
| cachex.putAllConflict(singletonMap(key, new GridCacheDrInfo(val, newVer))); |
| |
| if (success) { |
| assertEquals(newVer, ((CacheEntryVersion)cache.getEntry(k).version()).otherClusterVersion()); |
| assertEquals(newVal, cache.get(k)); |
| } else if (oldEntry != null) { |
| assertEquals(oldEntry.getValue(), cache.get(k)); |
| assertEquals(oldEntry.version(), cache.getEntry(k).version()); |
| } |
| } |
| |
| /** */ |
| private void remove(String key) { |
| assertTrue(cache.containsKey(key)); |
| |
| cache.remove(key); |
| |
| assertFalse(cache.containsKey(key)); |
| } |
| |
| /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. */ |
| private void removeConflict(String k, long order, boolean success) throws IgniteCheckedException { |
| removeConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success); |
| } |
| |
| /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. */ |
| private void removeConflict(String k, GridCacheVersion ver, boolean success) throws IgniteCheckedException { |
| assertTrue(cache.containsKey(k)); |
| |
| CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(k); |
| |
| KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k)); |
| |
| cachex.removeAllConflict(singletonMap(key, ver)); |
| |
| if (success) |
| assertFalse(cache.containsKey(k)); |
| else if (oldEntry != null) { |
| assertEquals(oldEntry.getValue(), cache.get(k)); |
| assertEquals(oldEntry.version(), cache.getEntry(k).version()); |
| } |
| } |
| |
| /** */ |
| private String key(String key, byte otherClusterId) { |
| return key + otherClusterId + cacheMode; |
| } |
| |
| /** */ |
| protected String conflictResolveField() { |
| return null; |
| } |
| } |