blob: 792a9980e30e473c93021f08d660096f87dae8b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.cache.expiry.AccessedExpiryPolicy;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ModifiedExpiryPolicy;
import com.google.common.collect.ImmutableSet;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.AbstractBinaryArraysTest;
import org.apache.ignite.internal.client.thin.ClientServerError;
import org.apache.ignite.internal.processors.cache.CacheEnumOperationsAbstractTest.TestEnum;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.mxbean.ClientProcessorMXBean;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.apache.ignite.spi.systemview.view.TransactionView;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import static org.apache.ignite.internal.processors.cache.CacheEnumOperationsAbstractTest.TestEnum.VAL1;
import static org.apache.ignite.internal.processors.cache.CacheEnumOperationsAbstractTest.TestEnum.VAL2;
import static org.apache.ignite.internal.processors.cache.CacheEnumOperationsAbstractTest.TestEnum.VAL3;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.TXS_MON_LIST;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
import static org.junit.Assert.assertArrayEquals;
/**
* Thin client functional tests.
*/
public class FunctionalTest extends AbstractBinaryArraysTest {
/** Per test timeout */
@SuppressWarnings("deprecation")
@Rule
public Timeout globalTimeout = new Timeout((int)GridTestUtils.DFLT_TEST_TIMEOUT);
/**
* Tested API:
* <ul>
* <li>{@link IgniteClient#cache(String)}</li>
* <li>{@link IgniteClient#getOrCreateCache(ClientCacheConfiguration)}</li>
* <li>{@link IgniteClient#cacheNames()}</li>
* <li>{@link IgniteClient#createCache(String)}</li>
* <li>{@link IgniteClient#createCache(ClientCacheConfiguration)}</li>
* <li>{@link IgniteCache#size(CachePeekMode...)}</li>
* </ul>
*/
@Test
public void testCacheManagement() throws Exception {
try (LocalIgniteCluster ignored = LocalIgniteCluster.start(2);
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
final String CACHE_NAME = "testCacheManagement";
ClientCacheConfiguration cacheCfg = new ClientCacheConfiguration().setName(CACHE_NAME)
.setCacheMode(CacheMode.REPLICATED)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
int key = 1;
Person val = new Person(key, Integer.toString(key));
ClientCache<Integer, Person> cache = client.getOrCreateCache(cacheCfg);
cache.put(key, val);
assertEquals(1, cache.size());
assertEquals(2, cache.size(CachePeekMode.ALL));
cache = client.cache(CACHE_NAME);
Person cachedVal = cache.get(key);
assertEquals(val, cachedVal);
Object[] cacheNames = new TreeSet<>(client.cacheNames()).toArray();
assertArrayEquals(new TreeSet<>(Arrays.asList(Config.DEFAULT_CACHE_NAME, CACHE_NAME)).toArray(), cacheNames);
client.destroyCache(CACHE_NAME);
cacheNames = client.cacheNames().toArray();
assertArrayEquals(new Object[] {Config.DEFAULT_CACHE_NAME}, cacheNames);
cache = client.createCache(CACHE_NAME);
assertFalse(cache.containsKey(key));
cacheNames = client.cacheNames().toArray();
assertArrayEquals(new TreeSet<>(Arrays.asList(Config.DEFAULT_CACHE_NAME, CACHE_NAME)).toArray(), cacheNames);
client.destroyCache(CACHE_NAME);
cache = client.createCache(cacheCfg);
assertFalse(cache.containsKey(key));
assertArrayEquals(new TreeSet<>(Arrays.asList(Config.DEFAULT_CACHE_NAME, CACHE_NAME)).toArray(), cacheNames);
}
}
/**
* Tested API:
* <ul>
* <li>{@link ClientCache#getName()}</li>
* <li>{@link ClientCache#getConfiguration()}</li>
* </ul>
*/
@Test
public void testCacheConfiguration() throws Exception {
final String dataRegionName = "functional-test-data-region";
IgniteConfiguration cfg = Config.getServerConfiguration()
.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setName(dataRegionName)));
try (Ignite ignored = Ignition.start(cfg);
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
final String CACHE_NAME = "testCacheConfiguration";
ClientCacheConfiguration cacheCfgTemplate = new ClientCacheConfiguration().setName(CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setBackups(3)
.setCacheMode(CacheMode.PARTITIONED)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
.setEagerTtl(false)
.setGroupName("FunctionalTest")
.setDefaultLockTimeout(12345)
.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE)
.setReadFromBackup(true)
.setRebalanceBatchSize(67890)
.setRebalanceBatchesPrefetchCount(102938)
.setRebalanceDelay(54321)
.setRebalanceMode(CacheRebalanceMode.SYNC)
.setRebalanceOrder(2)
.setRebalanceThrottle(564738)
.setRebalanceTimeout(142536)
.setKeyConfiguration(new CacheKeyConfiguration("Employee", "orgId"))
.setQueryEntities(new QueryEntity(int.class.getName(), "Employee")
.setTableName("EMPLOYEE")
.setFields(
Stream.of(
new SimpleEntry<>("id", Integer.class.getName()),
new SimpleEntry<>("orgId", Integer.class.getName())
).collect(Collectors.toMap(
SimpleEntry::getKey, SimpleEntry::getValue, (a, b) -> a, LinkedHashMap::new
))
)
// During query normalization null keyFields become empty set.
// Set empty collection for comparator.
.setKeyFields(Collections.emptySet())
.setKeyFieldName("id")
.setNotNullFields(Collections.singleton("id"))
.setDefaultFieldValues(Collections.singletonMap("id", 0))
.setIndexes(Collections.singletonList(new QueryIndex("id", true, "IDX_EMPLOYEE_ID")))
.setAliases(Stream.of("id", "orgId").collect(Collectors.toMap(f -> f, String::toUpperCase)))
)
.setExpiryPolicy(new PlatformExpiryPolicy(10, 20, 30))
.setCopyOnRead(!CacheConfiguration.DFLT_COPY_ON_READ)
.setDataRegionName(dataRegionName)
.setMaxConcurrentAsyncOperations(4)
.setMaxQueryIteratorsCount(4)
.setOnheapCacheEnabled(true)
.setQueryDetailMetricsSize(1024)
.setQueryParallelism(4)
.setSqlEscapeAll(true)
.setSqlIndexMaxInlineSize(1024)
.setSqlSchema("functional-test-schema")
.setStatisticsEnabled(true);
ClientCacheConfiguration cacheCfg = new ClientCacheConfiguration(cacheCfgTemplate);
ClientCache<Object, Object> cache = client.createCache(cacheCfg);
assertEquals(CACHE_NAME, cache.getName());
assertTrue(Comparers.equal(cacheCfgTemplate, cache.getConfiguration()));
}
}
/**
* Tested API:
* <ul>
* <li>{@link Ignition#startClient(ClientConfiguration)}</li>
* <li>{@link IgniteClient#getOrCreateCache(String)}</li>
* <li>{@link ClientCache#put(Object, Object)}</li>
* <li>{@link ClientCache#get(Object)}</li>
* <li>{@link ClientCache#containsKey(Object)}</li>
* <li>{@link ClientCache#clear(Object)}</li>
* </ul>
*/
@Test
public void testPutGet() throws Exception {
// Existing cache, primitive key and object value
try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Integer, Person> cache = client.getOrCreateCache(Config.DEFAULT_CACHE_NAME);
Integer key = 1;
Person val = new Person(key, "Joe");
cache.put(key, val);
assertTrue(cache.containsKey(key));
Person cachedVal = cache.get(key);
assertEquals(val, cachedVal);
cache.clear(key);
assertFalse(cache.containsKey(key));
assertNull(cache.get(key));
}
// Non-existing cache, object key and primitive value
try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Person, Integer> cache = client.getOrCreateCache("testPutGet");
Integer val = 1;
Person key = new Person(val, "Joe");
cache.put(key, val);
Integer cachedVal = cache.get(key);
assertEquals(val, cachedVal);
cache.clear(key);
assertFalse(cache.containsKey(key));
assertNull(cache.get(key));
}
// Object key and Object value
try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Person, Person> cache = client.getOrCreateCache("testPutGet");
Person key = new Person(1, "Joe Key");
Person val = new Person(1, "Joe Value");
cache.put(key, val);
Person cachedVal = cache.get(key);
assertEquals(val, cachedVal);
cache.clear(key);
assertFalse(cache.containsKey(key));
assertNull(cache.get(key));
}
}
/**
* Test cache operations with different data types.
*/
@Test
public void testDataTypes() throws Exception {
try (Ignite ignite = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ignite.getOrCreateCache(Config.DEFAULT_CACHE_NAME);
Person person = new Person(1, "name");
// Primitive and built-in types.
checkDataType(client, ignite, (byte)1);
checkDataType(client, ignite, (short)1);
checkDataType(client, ignite, 1);
checkDataType(client, ignite, 1L);
checkDataType(client, ignite, 1.0f);
checkDataType(client, ignite, 1.0d);
checkDataType(client, ignite, 'c');
checkDataType(client, ignite, true);
checkDataType(client, ignite, "string");
checkDataType(client, ignite, UUID.randomUUID());
checkDataType(client, ignite, new Date());
// Enum.
checkDataType(client, ignite, VAL1);
// Binary object.
checkDataType(client, ignite, person);
// Arrays.
checkDataType(client, ignite, new byte[] {(byte)1});
checkDataType(client, ignite, new short[] {(short)1});
checkDataType(client, ignite, new int[] {1});
checkDataType(client, ignite, new long[] {1L});
checkDataType(client, ignite, new float[] {1.0f});
checkDataType(client, ignite, new double[] {1.0d});
checkDataType(client, ignite, new char[] {'c'});
checkDataType(client, ignite, new boolean[] {true});
checkDataType(client, ignite, new String[] {"string"});
checkDataType(client, ignite, new UUID[] {UUID.randomUUID()});
checkDataType(client, ignite, new Date[] {new Date()});
checkDataType(client, ignite, new int[][] {new int[] {1}});
checkDataType(client, ignite, new TestEnum[] {VAL1, VAL2, VAL3});
checkDataType(client, ignite, new Person[] {person});
checkDataType(client, ignite, new Person[][] {new Person[] {person}});
checkDataType(client, ignite, new Object[] {1, "string", person, new Person[] {person}});
// Lists.
checkDataType(client, ignite, Collections.emptyList());
checkDataType(client, ignite, Collections.singletonList(person));
checkDataType(client, ignite, Arrays.asList(person, person));
checkDataType(client, ignite, new ArrayList<>(Arrays.asList(person, person)));
checkDataType(client, ignite, new LinkedList<>(Arrays.asList(person, person)));
checkDataType(client, ignite, Arrays.asList(Arrays.asList(person, person), person));
// Sets.
checkDataType(client, ignite, Collections.emptySet());
checkDataType(client, ignite, Collections.singleton(person));
checkDataType(client, ignite, new HashSet<>(Arrays.asList(1, 2)));
checkDataType(client, ignite, new HashSet<>(Arrays.asList(Arrays.asList(person, person), person)));
checkDataType(client, ignite, new HashSet<>(new ArrayList<>(Arrays.asList(Arrays.asList(person,
person), person))));
// Maps.
checkDataType(client, ignite, Collections.emptyMap());
checkDataType(client, ignite, Collections.singletonMap(1, person));
checkDataType(client, ignite, F.asMap(1, person));
checkDataType(client, ignite, new HashMap<>(F.asMap(1, person)));
checkDataType(client, ignite, new HashMap<>(F.asMap(new HashSet<>(Arrays.asList(1, 2)),
Arrays.asList(person, person))));
}
}
/**
* Check that we get the same value from the cache as we put before.
*
* @param client Thin client.
* @param ignite Ignite node.
* @param obj Value of data type to check.
*/
private void checkDataType(IgniteClient client, Ignite ignite, Object obj) {
IgniteCache<Object, Object> thickCache = ignite.cache(Config.DEFAULT_CACHE_NAME);
ClientCache<Object, Object> thinCache = client.cache(Config.DEFAULT_CACHE_NAME);
Integer key = 1;
thinCache.put(key, obj);
assertTrue(thinCache.containsKey(key));
Object cachedObj = thinCache.get(key);
assertEqualsArraysAware(obj, cachedObj);
assertEqualsArraysAware(obj, thickCache.get(key));
assertEquals(client.binary().typeId(obj.getClass().getName()), ignite.binary().typeId(obj.getClass().getName()));
// Server-side comparison with the original object.
assertTrue(thinCache.replace(key, obj, obj));
// Server-side comparison with the restored object.
assertTrue(thinCache.remove(key, cachedObj));
}
/**
* Tested API:
* <ul>
* <li>{@link ClientCache#putAll(Map)}</li>
* <li>{@link ClientCache#getAll(Set)}</li>
* <li>{@link ClientCache#containsKeys(Set)} (Set)}</li>
* <li>{@link ClientCache#clear()}</li>
* <li>{@link ClientCache#clearAll(Set)} ()}</li>
* </ul>
*/
@Test
public void testBatchPutGet() throws Exception {
// Existing cache, primitive key and object value
try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Integer, Person> cache = client.cache(Config.DEFAULT_CACHE_NAME);
Map<Integer, Person> data = IntStream
.rangeClosed(1, 1000).boxed()
.collect(Collectors.toMap(i -> i, i -> new Person(i, String.format("Person %s", i))));
assertFalse(cache.containsKeys(data.keySet()));
cache.putAll(data);
assertTrue(cache.containsKeys(data.keySet()));
Map<Integer, Person> cachedData = cache.getAll(data.keySet());
assertEquals(data, cachedData);
}
// Non-existing cache, object key and primitive value
try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Person, Integer> cache = client.createCache("testBatchPutGet");
Map<Person, Integer> data = IntStream
.rangeClosed(1, 1000).boxed()
.collect(Collectors.toMap(i -> new Person(i, String.format("Person %s", i)), i -> i));
assertFalse(cache.containsKeys(data.keySet()));
cache.putAll(data);
assertTrue(cache.containsKeys(data.keySet()));
Map<Person, Integer> cachedData = cache.getAll(data.keySet());
assertEquals(data, cachedData);
Set<Person> clearKeys = new HashSet<>();
Iterator<Person> keyIter = data.keySet().iterator();
for (int i = 0; i < 100; i++)
clearKeys.add(keyIter.next());
cache.clearAll(clearKeys);
assertFalse(cache.containsKeys(clearKeys));
assertTrue(cache.containsKeys(
data.keySet().stream().filter(key -> !data.containsKey(key)).collect(Collectors.toSet())));
assertEquals(data.size() - clearKeys.size(), cache.size(CachePeekMode.ALL));
cache.clear();
assertFalse(cache.containsKeys(data.keySet()));
assertEquals(0, cache.size(CachePeekMode.ALL));
}
}
/**
* Tested API:
* <ul>
* <li>{@link ClientCache#getAndPut(Object, Object)}</li>
* <li>{@link ClientCache#getAndRemove(Object)}</li>
* <li>{@link ClientCache#getAndReplace(Object, Object)}</li>
* <li>{@link ClientCache#putIfAbsent(Object, Object)}</li>
* <li>{@link ClientCache#getAndPutIfAbsent(Object, Object)}</li>
* </ul>
*/
@Test
public void testAtomicPutGet() throws Exception {
try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Integer, String> cache = client.createCache("testRemoveReplace");
assertNull(cache.getAndPut(1, "1"));
assertEquals("1", cache.getAndPut(1, "1.1"));
assertEquals("1.1", cache.getAndRemove(1));
assertNull(cache.getAndRemove(1));
assertTrue(cache.putIfAbsent(1, "1"));
assertFalse(cache.putIfAbsent(1, "1.1"));
assertEquals("1", cache.getAndReplace(1, "1.1"));
assertEquals("1.1", cache.getAndReplace(1, "1"));
assertNull(cache.getAndReplace(2, "2"));
assertEquals("1", cache.getAndPutIfAbsent(1, "1.1"));
assertEquals("1", cache.get(1));
assertNull(cache.getAndPutIfAbsent(3, "3"));
assertEquals("3", cache.get(3));
}
}
/**
* Tested API:
* <ul>
* <li>{@link ClientCache#replace(Object, Object)}</li>
* <li>{@link ClientCache#replace(Object, Object, Object)}</li>
* <li>{@link ClientCache#remove(Object)}</li>
* <li>{@link ClientCache#remove(Object, Object)}</li>
* <li>{@link ClientCache#removeAll()}</li>
* <li>{@link ClientCache#removeAll(Set)}</li>
* </ul>
*/
@Test
public void testRemoveReplace() throws Exception {
try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Integer, String> cache = client.createCache("testRemoveReplace");
Map<Integer, String> data = IntStream.rangeClosed(1, 100).boxed()
.collect(Collectors.toMap(i -> i, Object::toString));
cache.putAll(data);
assertFalse(cache.replace(1, "2", "3"));
assertEquals("1", cache.get(1));
assertTrue(cache.replace(1, "1", "3"));
assertEquals("3", cache.get(1));
assertFalse(cache.replace(101, "101"));
assertNull(cache.get(101));
assertTrue(cache.replace(100, "101"));
assertEquals("101", cache.get(100));
assertFalse(cache.remove(101));
assertTrue(cache.remove(100));
assertNull(cache.get(100));
assertFalse(cache.remove(99, "100"));
assertEquals("99", cache.get(99));
assertTrue(cache.remove(99, "99"));
assertNull(cache.get(99));
cache.put(101, "101");
cache.removeAll(data.keySet());
assertEquals(1, cache.size());
assertEquals("101", cache.get(101));
cache.removeAll();
assertEquals(0, cache.size());
}
}
/**
* Test client fails on start if server is unavailable
*/
@Test
public void testClientFailsOnStart() {
ClientConnectionException expEx = null;
//noinspection EmptyTryBlock
try (IgniteClient ignored = Ignition.startClient(getClientConfiguration())) {
// No-op.
}
catch (ClientConnectionException connEx) {
expEx = connEx;
}
catch (Exception ex) {
fail(String.format(
"%s expected but %s was received: %s",
ClientConnectionException.class.getName(),
ex.getClass().getName(),
ex
));
}
assertNotNull(
String.format("%s expected but no exception was received", ClientConnectionException.class.getName()),
expEx
);
}
/**
* Test PESSIMISTIC REPEATABLE_READ tx holds lock and other tx should be timed out.
*/
@Test
public void testPessimisticRepeatableReadsTransactionHoldsLock() throws Exception {
testPessimisticTxLocking(REPEATABLE_READ);
}
/**
* Test PESSIMISTIC SERIALIZABLE tx holds lock and other tx should be timed out.
*/
@Test
public void testPessimisticSerializableTransactionHoldsLock() throws Exception {
testPessimisticTxLocking(SERIALIZABLE);
}
/**
* Test pessimistic tx holds the lock.
*/
private void testPessimisticTxLocking(TransactionIsolation isolation) throws Exception {
try (Ignite ignite = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
.setName("cache")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
);
cache.put(0, "value0");
IgniteInternalFuture<?> fut;
try (ClientTransaction tx = client.transactions().txStart(PESSIMISTIC, isolation)) {
assertEquals("value0", cache.get(0));
CyclicBarrier barrier = new CyclicBarrier(2);
fut = GridTestUtils.runAsync(() -> {
try (ClientTransaction tx2 = client.transactions().txStart(OPTIMISTIC, REPEATABLE_READ, 500)) {
cache.put(0, "value2");
tx2.commit();
}
finally {
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
}
catch (Throwable ignore) {
// No-op.
}
}
});
barrier.await(2000, TimeUnit.MILLISECONDS);
tx.commit();
}
assertEquals("value0", cache.get(0));
assertThrowsAnyCause(null, fut::get, ClientException.class,
"Failed to acquire lock within provided timeout");
}
}
/**
* Test OPTIMISTIC SERIALIZABLE tx rolls backs if another TX commits.
*/
@Test
public void testOptimitsticSerializableTransactionHoldsLock() throws Exception {
try (Ignite ignite = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
.setName("cache")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
);
cache.put(0, "value0");
final CountDownLatch latch = new CountDownLatch(1);
try (ClientTransaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
assertEquals("value0", cache.get(0));
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
try (ClientTransaction tx2 = client.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
cache.put(0, "value2");
tx2.commit();
}
finally {
latch.countDown();
}
});
latch.await();
cache.put(0, "value1");
fut.get();
assertThrowsAnyCause(null, () -> {
tx.commit();
return null;
}, ClientException.class, "read/write conflict");
}
assertEquals("value2", cache.get(0));
}
}
/**
* Test OPTIMISTIC REPEATABLE_READ tx doesn't conflict with a regular cache put.
*/
@Test
public void testOptimitsticRepeatableReadUpdatesValue() throws Exception {
try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
.setName("cache")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
);
cache.put(0, "value0");
try (ClientTransaction tx = client.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
assertEquals("value0", cache.get(0));
cache.put(0, "value1");
GridTestUtils.runAsync(() -> {
assertEquals("value0", cache.get(0));
cache.put(0, "value2");
assertEquals("value2", cache.get(0));
}).get();
tx.commit();
}
assertEquals("value1", cache.get(0));
}
}
/**
* Test that client-connector worker can process further transactional requests (resume transactions) after
* external termination of previous transaction.
*/
@Test
public void testTxResumeAfterTxTimeout() throws Exception {
IgniteConfiguration cfg = Config.getServerConfiguration().setClientConnectorConfiguration(
new ClientConnectorConfiguration().setThreadPoolSize(1));
try (Ignite ignite = Ignition.start(cfg); IgniteClient client = Ignition.startClient(getClientConfiguration())) {
String cacheName = "cache";
IgniteCache<Object, Object> igniteCache = ignite.createCache(new CacheConfiguration<>(cacheName)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
try (ClientTransaction clientTx = client.transactions().txStart()) {
runAsync(() -> {
try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
igniteCache.put(0, 0); // Lock key by ignite node.
try {
// Start, but don't close the transaction (to keep it in the threadMap after timeout).
client.transactions().txStart(PESSIMISTIC, READ_COMMITTED, 200L);
// Wait until transaction interrupted externally by timeout.
client.cache(cacheName).put(0, 0);
fail();
}
catch (ClientException ignored) {
// Expected.
}
}
}).get();
// Resume tx in the worker with interrupted transaction.
assertFalse(client.cache(cacheName).containsKey(0));
}
}
}
/**
* Test transactions.
*/
@Test
public void testTransactions() throws Exception {
try (Ignite ignite = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
.setName("cache")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
);
cache.put(0, "value0");
cache.put(1, "value1");
// Test nested transactions is not possible.
try (ClientTransaction tx = client.transactions().txStart()) {
try (ClientTransaction tx1 = client.transactions().txStart()) {
fail();
}
catch (ClientException expected) {
// No-op.
}
}
// Test implicit rollback when transaction closed.
try (ClientTransaction tx = client.transactions().txStart()) {
cache.put(1, "value2");
}
assertEquals("value1", cache.get(1));
// Test explicit rollback.
try (ClientTransaction tx = client.transactions().txStart()) {
cache.put(1, "value2");
tx.rollback();
}
assertEquals("value1", cache.get(1));
// Test commit.
try (ClientTransaction tx = client.transactions().txStart()) {
cache.put(1, "value2");
tx.commit();
}
assertEquals("value2", cache.get(1));
// Test end of already completed transaction.
ClientTransaction tx0 = client.transactions().txStart();
tx0.close();
try {
tx0.commit();
fail();
}
catch (ClientException expected) {
// No-op.
}
// Test end of outdated transaction.
try (ClientTransaction tx = client.transactions().txStart()) {
try {
tx0.commit();
fail();
}
catch (ClientException expected) {
// No-op.
}
tx.commit();
}
// Test transaction with a timeout.
long TX_TIMEOUT = 200L;
try (ClientTransaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED, TX_TIMEOUT)) {
long txStartedTime = U.currentTimeMillis();
cache.put(1, "value3");
while (txStartedTime + TX_TIMEOUT >= U.currentTimeMillis())
U.sleep(100L);
try {
cache.put(1, "value4");
fail();
}
catch (ClientException expected) {
// No-op.
}
try {
tx.commit();
fail();
}
catch (ClientException expected) {
// No-op.
}
}
assertEquals("value2", cache.get(1));
cache.put(1, "value5");
// Test failover.
ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients",
ClientListenerProcessor.class, ClientProcessorMXBean.class);
try (ClientTransaction tx = client.transactions().txStart()) {
cache.put(1, "value6");
mxBean.dropAllConnections();
try {
cache.put(1, "value7");
fail();
}
catch (ClientException expected) {
// No-op.
}
// Start new transaction doesn't recover cache operations on failed channel.
try (ClientTransaction tx1 = client.transactions().txStart()) {
fail();
}
catch (ClientException expected) {
// No-op.
}
try {
cache.get(1);
fail();
}
catch (ClientException expected) {
// No-op.
}
// Close outdated transaction doesn't recover cache operations on failed channel.
tx0.close();
try {
cache.get(1);
fail();
}
catch (ClientException expected) {
// No-op.
}
}
assertEquals("value5", cache.get(1));
// Test concurrent transactions in different connections.
try (IgniteClient client1 = Ignition.startClient(getClientConfiguration())) {
ClientCache<Integer, String> cache1 = client1.cache("cache");
try (ClientTransaction tx = client.transactions().txStart(OPTIMISTIC, READ_COMMITTED)) {
cache.put(0, "value8");
try (ClientTransaction tx1 = client1.transactions().txStart(OPTIMISTIC, READ_COMMITTED)) {
assertEquals("value8", cache.get(0));
assertEquals("value0", cache1.get(0));
cache1.put(1, "value9");
assertEquals("value5", cache.get(1));
assertEquals("value9", cache1.get(1));
tx1.commit();
assertEquals("value9", cache.get(1));
}
assertEquals("value0", cache1.get(0));
tx.commit();
assertEquals("value8", cache1.get(0));
}
}
// Check different types of cache operations.
try (ClientTransaction tx = client.transactions().txStart()) {
// Operations: put, putAll, putIfAbsent.
cache.put(2, "value10");
cache.putAll(F.asMap(1, "value11", 3, "value12"));
cache.putIfAbsent(4, "value13");
// Operations: get, getAll, getAndPut, getAndRemove, getAndReplace, getAndPutIfAbsent.
assertEquals("value10", cache.get(2));
assertEquals(F.asMap(1, "value11", 2, "value10"),
cache.getAll(new HashSet<>(Arrays.asList(1, 2))));
assertEquals("value13", cache.getAndPut(4, "value14"));
assertEquals("value14", cache.getAndPutIfAbsent(4, "valueDiscarded"));
assertEquals("value14", cache.get(4));
assertEquals("value14", cache.getAndReplace(4, "value15"));
assertEquals("value15", cache.getAndRemove(4));
assertNull(cache.getAndPutIfAbsent(10, "valuePutIfAbsent"));
assertEquals("valuePutIfAbsent", cache.get(10));
// Operations: contains.
assertTrue(cache.containsKey(2));
assertFalse(cache.containsKey(4));
assertTrue(cache.containsKeys(ImmutableSet.of(2, 10)));
assertFalse(cache.containsKeys(ImmutableSet.of(2, 4)));
// Operations: replace.
cache.put(4, "");
assertTrue(cache.replace(4, "value16"));
assertTrue(cache.replace(4, "value16", "value17"));
// Operations: remove, removeAll
cache.putAll(F.asMap(5, "", 6, ""));
assertTrue(cache.remove(5));
assertTrue(cache.remove(4, "value17"));
cache.removeAll(new HashSet<>(Arrays.asList(3, 6)));
assertFalse(cache.containsKey(3));
assertFalse(cache.containsKey(6));
tx.rollback();
}
assertEquals(F.asMap(0, "value8", 1, "value9"),
cache.getAll(new HashSet<>(Arrays.asList(0, 1))));
assertFalse(cache.containsKey(2));
// Test concurrent transactions started by different threads.
try (ClientTransaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
CyclicBarrier barrier = new CyclicBarrier(2);
cache.put(0, "value18");
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
try (ClientTransaction tx1 = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
cache.put(1, "value19");
barrier.await();
assertEquals("value8", cache.get(0));
barrier.await();
tx1.commit();
barrier.await();
assertEquals("value18", cache.get(0));
}
catch (InterruptedException | BrokenBarrierException ignore) {
// No-op.
}
});
barrier.await();
assertEquals("value9", cache.get(1));
barrier.await();
tx.commit();
barrier.await();
assertEquals("value19", cache.get(1));
fut.get();
}
// Test transaction usage by different threads.
try (ClientTransaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
cache.put(0, "value20");
GridTestUtils.runAsync(() -> {
// Implicit transaction started here.
cache.put(1, "value21");
assertEquals("value18", cache.get(0));
try {
// Transaction can't be commited by another thread.
tx.commit();
fail();
}
catch (ClientException expected) {
// No-op.
}
// Transaction can be closed by another thread.
tx.close();
assertEquals("value18", cache.get(0));
}).get();
assertEquals("value21", cache.get(1));
try {
// Transaction can't be commited after another thread close this transaction.
tx.commit();
fail();
}
catch (ClientException expected) {
// No-op.
}
assertEquals("value18", cache.get(0));
// Start implicit transaction after explicit transaction has been closed by another thread.
cache.put(0, "value22");
GridTestUtils.runAsync(() -> assertEquals("value22", cache.get(0))).get();
// New explicit transaction can be started after current transaction has been closed by another thread.
try (ClientTransaction tx1 = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
cache.put(0, "value23");
tx1.commit();
}
assertEquals("value23", cache.get(0));
}
// Test active transactions limit.
int txLimit = ignite.configuration().getClientConnectorConfiguration().getThinClientConfiguration()
.getMaxActiveTxPerConnection();
List<ClientTransaction> txs = new ArrayList<>(txLimit);
for (int i = 0; i < txLimit; i++) {
Thread t = new Thread(() -> txs.add(client.transactions().txStart()));
t.start();
t.join();
}
try (ClientTransaction ignored = client.transactions().txStart()) {
fail();
}
catch (ClientException e) {
ClientServerError cause = (ClientServerError)e.getCause();
assertEquals(ClientStatus.TX_LIMIT_EXCEEDED, cause.getCode());
}
for (ClientTransaction tx : txs)
tx.close();
// Test that new transaction can be started after commit of the previous one without closing.
ClientTransaction tx = client.transactions().txStart();
tx.commit();
tx = client.transactions().txStart();
tx.rollback();
// Test that new transaction can be started after rollback of the previous one without closing.
tx = client.transactions().txStart();
tx.commit();
// Test that implicit transaction started after commit of previous one without closing.
cache.put(0, "value24");
GridTestUtils.runAsync(() -> assertEquals("value24", cache.get(0))).get();
}
}
/**
* Test transactions.
*/
@Test
public void testTransactionsAsync() throws Exception {
try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
.setName("cache")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
);
cache.put(0, "value0");
cache.put(1, "value1");
// Test implicit rollback when transaction closed.
try (ClientTransaction tx = client.transactions().txStart()) {
cache.putAsync(1, "value2").get();
}
assertEquals("value1", cache.get(1));
// Test explicit rollback.
try (ClientTransaction tx = client.transactions().txStart()) {
cache.putAsync(1, "value2").get();
tx.rollback();
}
assertEquals("value1", cache.get(1));
// Test commit.
try (ClientTransaction tx = client.transactions().txStart()) {
cache.putAsync(1, "value2").get();
tx.commit();
}
assertEquals("value2", cache.get(1));
}
}
/**
* Test transactions with label.
*/
@Test
public void testTransactionsWithLabel() throws Exception {
try (IgniteEx ignite = (IgniteEx)Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
.setName("cache")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
);
SystemView<TransactionView> txsView = ignite.context().systemView().view(TXS_MON_LIST);
cache.put(0, "value1");
try (ClientTransaction tx = client.transactions().withLabel("label").txStart()) {
cache.put(0, "value2");
assertEquals(1, F.size(txsView.iterator()));
TransactionView txv = txsView.iterator().next();
assertEquals("label", txv.label());
assertEquals("value2", cache.get(0));
}
assertEquals("value1", cache.get(0));
try (ClientTransaction tx = client.transactions().withLabel("label1").withLabel("label2").txStart()) {
cache.put(0, "value2");
assertEquals(1, F.size(txsView.iterator()));
TransactionView txv = txsView.iterator().next();
assertEquals("label2", txv.label());
tx.commit();
}
assertEquals("value2", cache.get(0));
// Test concurrent with label and without label transactions.
try (ClientTransaction tx = client.transactions().withLabel("label").txStart(PESSIMISTIC, READ_COMMITTED)) {
CyclicBarrier barrier = new CyclicBarrier(2);
cache.put(0, "value3");
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
try (ClientTransaction tx1 = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
cache.put(1, "value3");
barrier.await();
assertEquals("value2", cache.get(0));
barrier.await();
}
catch (InterruptedException | BrokenBarrierException ignore) {
// No-op.
}
});
barrier.await();
assertNull(cache.get(1));
assertEquals(1, F.size(txsView.iterator(), txv -> txv.label() == null));
assertEquals(1, F.size(txsView.iterator(), txv -> "label".equals(txv.label())));
barrier.await();
fut.get();
}
// Test nested transactions is not possible.
try (ClientTransaction tx = client.transactions().withLabel("label1").txStart()) {
try (ClientTransaction tx1 = client.transactions().txStart()) {
fail();
}
catch (ClientException expected) {
// No-op.
}
try (ClientTransaction tx1 = client.transactions().withLabel("label2").txStart()) {
fail();
}
catch (ClientException expected) {
// No-op.
}
}
}
}
/**
* Test cache with expire policy.
*/
@Test
public void testExpirePolicy() throws Exception {
long ttl = 600L;
int MAX_RETRIES = 5;
try (Ignite ignite = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Integer, Object> cache = client.createCache(new ClientCacheConfiguration()
.setName("cache")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
);
Duration dur = new Duration(TimeUnit.MILLISECONDS, ttl);
ClientCache<Integer, Object> cachePlcCreated = cache.withExpirePolicy(new CreatedExpiryPolicy(dur));
ClientCache<Integer, Object> cachePlcUpdated = cache.withExpirePolicy(new ModifiedExpiryPolicy(dur));
ClientCache<Integer, Object> cachePlcAccessed = cache.withExpirePolicy(new AccessedExpiryPolicy(dur));
for (int i = 0; i < MAX_RETRIES; i++) {
cache.clear();
long ts = U.currentTimeMillis();
cache.put(0, 0);
cachePlcCreated.put(1, 1);
cachePlcUpdated.put(2, 2);
cachePlcAccessed.put(3, 3);
U.sleep(ttl / 3 * 2);
boolean containsKey0 = cache.containsKey(0);
boolean containsKey1 = cache.containsKey(1);
boolean containsKey2 = cache.containsKey(2);
boolean containsKey3 = cache.containsKey(3);
if (U.currentTimeMillis() - ts >= ttl) // Retry if this block execution takes too long.
continue;
assertTrue(containsKey0);
assertTrue(containsKey1);
assertTrue(containsKey2);
assertTrue(containsKey3);
ts = U.currentTimeMillis();
cachePlcCreated.put(1, 2);
cachePlcCreated.get(1); // Update and access key with created expire policy.
cachePlcUpdated.put(2, 3); // Update key with modified expire policy.
cachePlcAccessed.get(3); // Access key with accessed expire policy.
U.sleep(ttl / 3 * 2);
containsKey0 = cache.containsKey(0);
containsKey1 = cache.containsKey(1);
containsKey2 = cache.containsKey(2);
containsKey3 = cache.containsKey(3);
if (U.currentTimeMillis() - ts >= ttl) // Retry if this block execution takes too long.
continue;
assertTrue(containsKey0);
assertFalse(containsKey1);
assertTrue(containsKey2);
assertTrue(containsKey3);
U.sleep(ttl / 3 * 2);
cachePlcUpdated.get(2); // Access key with updated expire policy.
U.sleep(ttl / 3 * 2);
assertTrue(cache.containsKey(0));
assertFalse(cache.containsKey(1));
assertFalse(cache.containsKey(2));
assertFalse(cache.containsKey(3));
// Expire policy, keep binary and transactional flags together.
ClientCache<Integer, Object> binCache = cachePlcCreated.withKeepBinary();
try (ClientTransaction tx = client.transactions().txStart()) {
binCache.put(4, new T2<>("test", "test"));
tx.commit();
}
assertTrue(binCache.get(4) instanceof BinaryObject);
assertFalse(cache.get(4) instanceof BinaryObject);
U.sleep(ttl / 3 * 4);
assertFalse(cache.containsKey(4));
return;
}
fail("Failed to check expire policy within " + MAX_RETRIES + " retries (block execution takes too long)");
}
}
/** */
private static ClientConfiguration getClientConfiguration() {
return new ClientConfiguration()
.setAddresses(Config.SERVER)
.setSendBufferSize(0)
.setReceiveBufferSize(0);
}
}