blob: 3405d3563beceef89d29b57776f6cf040e07d9dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.thin;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.Person;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
* Data replication operations test.
*/
@RunWith(Parameterized.class)
public class DataReplicationOperationsTest extends AbstractThinClientTest {
/** Keys count. */
private static final int KEYS_CNT = 10;
/** TTL. */
public static final int TTL = 1000;
/** */
private static IgniteClient client;
/** */
private static TcpClientCache<Object, Object> cache;
/** */
private final GridCacheVersion otherVer = new GridCacheVersion(1, 1, 1, 2);
/** {@code True} if operate with binary objects. */
@Parameterized.Parameter
public boolean binary;
/** Cache mode. */
@Parameterized.Parameter(1)
public CacheAtomicityMode mode;
/** @return Test parameters. */
@Parameterized.Parameters(name = "binary={0}, cacheMode={1}")
public static Collection<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
for (boolean binary : new boolean[]{false, true})
for (CacheAtomicityMode mode : new CacheAtomicityMode[]{TRANSACTIONAL, ATOMIC})
params.add(new Object[]{binary, mode});
return params;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGrids(2);
client = startClient(grid(0));
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
grid(0).destroyCaches(grid(0).cacheNames());
ClientCacheConfiguration ccfg = new ClientCacheConfiguration()
.setName(DEFAULT_CACHE_NAME)
.setAtomicityMode(mode);
cache = (TcpClientCache<Object, Object>)client.createCache(ccfg);
if (binary)
cache = (TcpClientCache<Object, Object>)cache.withKeepBinary();
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
client.close();
}
/** */
@Test
public void testPutAllConflict() {
Map<Object, T3<Object, GridCacheVersion, Long>> data = createPutAllData(CU.EXPIRE_TIME_ETERNAL);
cache.putAllConflict(data);
data.forEach((key, val) -> assertEquals(val.get1(), cache.get(key)));
}
/** */
@Test
public void testRemoveAllConflict() {
for (int i = 0; i < KEYS_CNT; i++)
cache.put(new Person(i, "Person-" + i), new Person(i, "Person-" + i));
Map<Object, GridCacheVersion> map = new HashMap<>();
for (int i = 0; i < KEYS_CNT; i++) {
Person key = new Person(i, "Person-" + i);
map.put(binary ? client.binary().toBinary(key) : key, otherVer);
}
cache.removeAllConflict(map);
map.keySet().forEach(key -> assertFalse(cache.containsKey(key)));
}
/** @throws Exception If fails. */
@Test
public void testWithPerEntryExpiry() throws Exception {
TcpClientCache<Object, Object> cache0 =
(TcpClientCache<Object, Object>)client.getOrCreateCache(DEFAULT_CACHE_NAME);
TcpClientCache<Object, Object> cache = binary ?
(TcpClientCache<Object, Object>)cache0.withKeepBinary() : cache0;
Map<Object, T3<Object, GridCacheVersion, Long>> data = createPutAllData(System.currentTimeMillis() + TTL);
cache.putAllConflict(data);
assertTrue(cache.containsKeys(data.keySet()));
assertTrue(waitForCondition(() -> data.keySet().stream().noneMatch(cache::containsKey), 2 * TTL));
}
/** */
@Test
public void testMixedExpiryTime() {
Map<Object, T3<Object, GridCacheVersion, Long>> put0 = createPutAllData(System.currentTimeMillis() + TTL);
Map<Object, T3<Object, GridCacheVersion, Long>> put1 = createPutAllData(KEYS_CNT, CU.EXPIRE_TIME_ETERNAL);
Map<Object, T3<Object, GridCacheVersion, Long>> drMap = new HashMap<>();
drMap.putAll(put0);
drMap.putAll(put1);
cache.putAllConflict(drMap);
assertTrue(cache.containsKeys(put0.keySet()));
assertTrue(cache.containsKeys(put1.keySet()));
}
/** */
private Map<Object, T3<Object, GridCacheVersion, Long>> createPutAllData(long expireTime) {
return createPutAllData(0, expireTime);
}
/** */
private Map<Object, T3<Object, GridCacheVersion, Long>> createPutAllData(int startKey, long expireTime) {
Map<Object, T3<Object, GridCacheVersion, Long>> map = new HashMap<>();
for (int i = startKey; i < startKey + KEYS_CNT; i++) {
Person key = new Person(i, "Person-" + i);
Person val = new Person(i, "Person-" + i);
map.put(binary ? client.binary().toBinary(key) : key,
new T3<>(binary ? client.binary().toBinary(val) : val, otherVer, expireTime));
}
return map;
}
}