blob: 6b84da73ce865fc9787311d717c737ff0583ed65 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.tests;
import java.io.IOException;
import java.net.URL;
import java.time.Instant;
import java.util.Collection;
import java.util.Map;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory;
import org.apache.ignite.cache.store.cassandra.datasource.DataSource;
import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.tests.pojos.Person;
import org.apache.ignite.tests.pojos.PersonId;
import org.apache.ignite.tests.pojos.Product;
import org.apache.ignite.tests.pojos.ProductOrder;
import org.apache.ignite.tests.pojos.SimplePerson;
import org.apache.ignite.tests.pojos.SimplePersonId;
import org.apache.ignite.tests.utils.CacheStoreHelper;
import org.apache.ignite.tests.utils.CassandraAdminCredentials;
import org.apache.ignite.tests.utils.CassandraHelper;
import org.apache.ignite.tests.utils.TestsHelper;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.core.io.ClassPathResource;
/**
* Unit tests for Ignite caches which utilizing {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}
* to store cache data into Cassandra tables
*/
public class IgnitePersistentStoreTest {
/** */
private static final Logger LOGGER = Logger.getLogger(IgnitePersistentStoreTest.class.getName());
/** */
@BeforeClass
public static void setUpClass() {
if (CassandraHelper.useEmbeddedCassandra()) {
try {
CassandraHelper.startEmbeddedCassandra(LOGGER);
}
catch (Throwable e) {
throw new RuntimeException("Failed to start embedded Cassandra instance", e);
}
}
LOGGER.info("Testing admin connection to Cassandra");
CassandraHelper.testAdminConnection();
LOGGER.info("Testing regular connection to Cassandra");
CassandraHelper.testRegularConnection();
LOGGER.info("Dropping all artifacts from previous tests execution session");
CassandraHelper.dropTestKeyspaces();
LOGGER.info("Start tests execution");
}
/** */
@AfterClass
public static void tearDownClass() {
try {
CassandraHelper.dropTestKeyspaces();
}
finally {
CassandraHelper.releaseCassandraResources();
if (CassandraHelper.useEmbeddedCassandra()) {
try {
CassandraHelper.stopEmbeddedCassandra();
}
catch (Throwable e) {
LOGGER.error("Failed to stop embedded Cassandra instance", e);
}
}
}
}
/** */
@Test
public void primitiveStrategyTest() {
Ignition.stopAll(true);
Map<Long, Long> longMap = TestsHelper.generateLongsMap();
Map<String, String> strMap = TestsHelper.generateStringsMap();
LOGGER.info("Running PRIMITIVE strategy write tests");
try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/primitive/ignite-config.xml")) {
IgniteCache<Long, Long> longCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Long>("cache1"));
IgniteCache<String, String> strCache = ignite.getOrCreateCache(new CacheConfiguration<String, String>("cache2"));
LOGGER.info("Running single operation write tests");
longCache.put(1L, 1L);
strCache.put("1", "1");
LOGGER.info("Single operation write tests passed");
LOGGER.info("Running bulk operation write tests");
longCache.putAll(longMap);
strCache.putAll(strMap);
LOGGER.info("Bulk operation write tests passed");
}
LOGGER.info("PRIMITIVE strategy write tests passed");
Ignition.stopAll(true);
try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/primitive/ignite-config.xml")) {
LOGGER.info("Running PRIMITIVE strategy read tests");
IgniteCache<Long, Long> longCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Long>("cache1"));
IgniteCache<String, String> strCache = ignite.getOrCreateCache(new CacheConfiguration<String, String>("cache2"));
LOGGER.info("Running single operation read tests");
Long longVal = longCache.get(1L);
if (!longVal.equals(longMap.get(1L)))
throw new RuntimeException("Long value was incorrectly deserialized from Cassandra");
String strVal = strCache.get("1");
if (!strVal.equals(strMap.get("1")))
throw new RuntimeException("String value was incorrectly deserialized from Cassandra");
LOGGER.info("Single operation read tests passed");
LOGGER.info("Running bulk operation read tests");
Map<Long, Long> longMap1 = longCache.getAll(longMap.keySet());
if (!TestsHelper.checkMapsEqual(longMap, longMap1))
throw new RuntimeException("Long values batch was incorrectly deserialized from Cassandra");
Map<String, String> strMap1 = strCache.getAll(strMap.keySet());
if (!TestsHelper.checkMapsEqual(strMap, strMap1))
throw new RuntimeException("String values batch was incorrectly deserialized from Cassandra");
LOGGER.info("Bulk operation read tests passed");
LOGGER.info("PRIMITIVE strategy read tests passed");
LOGGER.info("Running PRIMITIVE strategy delete tests");
longCache.remove(1L);
longCache.removeAll(longMap.keySet());
strCache.remove("1");
strCache.removeAll(strMap.keySet());
LOGGER.info("PRIMITIVE strategy delete tests passed");
}
}
/** */
@Test
public void blobStrategyTest() {
Ignition.stopAll(true);
Map<Long, Long> longMap = TestsHelper.generateLongsMap();
Map<Long, Person> personMap = TestsHelper.generateLongsPersonsMap();
LOGGER.info("Running BLOB strategy write tests");
try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/blob/ignite-config.xml")) {
IgniteCache<Long, Long> longCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Long>("cache1"));
IgniteCache<Long, Person> personCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Person>("cache2"));
LOGGER.info("Running single operation write tests");
longCache.put(1L, 1L);
personCache.put(1L, TestsHelper.generateRandomPerson(1L));
LOGGER.info("Single operation write tests passed");
LOGGER.info("Running bulk operation write tests");
longCache.putAll(longMap);
personCache.putAll(personMap);
LOGGER.info("Bulk operation write tests passed");
}
LOGGER.info("BLOB strategy write tests passed");
Ignition.stopAll(true);
try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/blob/ignite-config.xml")) {
LOGGER.info("Running BLOB strategy read tests");
IgniteCache<Long, Long> longCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Long>("cache1"));
IgniteCache<Long, Person> personCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Person>("cache2"));
LOGGER.info("Running single operation read tests");
Long longVal = longCache.get(1L);
if (!longVal.equals(longMap.get(1L)))
throw new RuntimeException("Long value was incorrectly deserialized from Cassandra");
Person person = personCache.get(1L);
if (!person.equals(personMap.get(1L)))
throw new RuntimeException("Person value was incorrectly deserialized from Cassandra");
LOGGER.info("Single operation read tests passed");
LOGGER.info("Running bulk operation read tests");
Map<Long, Long> longMap1 = longCache.getAll(longMap.keySet());
if (!TestsHelper.checkMapsEqual(longMap, longMap1))
throw new RuntimeException("Long values batch was incorrectly deserialized from Cassandra");
Map<Long, Person> personMap1 = personCache.getAll(personMap.keySet());
if (!TestsHelper.checkPersonMapsEqual(personMap, personMap1, false))
throw new RuntimeException("Person values batch was incorrectly deserialized from Cassandra");
LOGGER.info("Bulk operation read tests passed");
LOGGER.info("BLOB strategy read tests passed");
LOGGER.info("Running BLOB strategy delete tests");
longCache.remove(1L);
longCache.removeAll(longMap.keySet());
personCache.remove(1L);
personCache.removeAll(personMap.keySet());
LOGGER.info("BLOB strategy delete tests passed");
}
}
/** */
@Test
public void blobBinaryLoadCacheTest() {
Ignition.stopAll(true);
try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) {
IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2");
assert ignite.configuration().getMarshaller() instanceof BinaryMarshaller;
personCache.put(1L, new PojoPerson(1, "name"));
assert personCache.withKeepBinary().get(1L) instanceof BinaryObject;
}
Ignition.stopAll(true);
try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) {
IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2");
personCache.loadCache(null, null);
PojoPerson person = personCache.get(1L);
LOGGER.info("loadCache tests passed");
}
}
/** */
@Test
public void pojoStrategyTest() {
Ignition.stopAll(true);
LOGGER.info("Running POJO strategy write tests");
Map<Long, Person> personMap1 = TestsHelper.generateLongsPersonsMap();
Map<PersonId, Person> personMap2 = TestsHelper.generatePersonIdsPersonsMap();
Map<Long, Product> productsMap = TestsHelper.generateProductsMap();
Map<Long, ProductOrder> ordersMap = TestsHelper.generateOrdersMap();
Product product = TestsHelper.generateRandomProduct(-1L);
ProductOrder order = TestsHelper.generateRandomOrder(-1L);
try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) {
IgniteCache<Long, Person> personCache1 = ignite.getOrCreateCache(new CacheConfiguration<Long, Person>("cache1"));
IgniteCache<PersonId, Person> personCache2 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache2"));
IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache3"));
IgniteCache<PersonId, Person> personCache4 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache4"));
IgniteCache<Long, Product> productCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Product>("product"));
IgniteCache<Long, ProductOrder> orderCache = ignite.getOrCreateCache(new CacheConfiguration<Long, ProductOrder>("order"));
LOGGER.info("Running single operation write tests");
personCache1.put(1L, TestsHelper.generateRandomPerson(1L));
PersonId id = TestsHelper.generateRandomPersonId();
personCache2.put(id, TestsHelper.generateRandomPerson(id.getPersonNumber()));
id = TestsHelper.generateRandomPersonId();
personCache3.put(id, TestsHelper.generateRandomPerson(id.getPersonNumber()));
personCache4.put(id, TestsHelper.generateRandomPerson(id.getPersonNumber()));
productCache.put(product.getId(), product);
orderCache.put(order.getId(), order);
LOGGER.info("Single operation write tests passed");
LOGGER.info("Running bulk operation write tests");
personCache1.putAll(personMap1);
personCache2.putAll(personMap2);
personCache3.putAll(personMap2);
personCache4.putAll(personMap2);
productCache.putAll(productsMap);
orderCache.putAll(ordersMap);
LOGGER.info("Bulk operation write tests passed");
}
LOGGER.info("POJO strategy write tests passed");
Ignition.stopAll(true);
try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) {
LOGGER.info("Running POJO strategy read tests");
IgniteCache<Long, Person> personCache1 = ignite.getOrCreateCache(new CacheConfiguration<Long, Person>("cache1"));
IgniteCache<PersonId, Person> personCache2 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache2"));
IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache3"));
IgniteCache<PersonId, Person> personCache4 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache4"));
IgniteCache<Long, Product> productCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Product>("product"));
IgniteCache<Long, ProductOrder> orderCache = ignite.getOrCreateCache(new CacheConfiguration<Long, ProductOrder>("order"));
LOGGER.info("Running single operation read tests");
Person person = personCache1.get(1L);
if (!person.equalsPrimitiveFields(personMap1.get(1L)))
throw new RuntimeException("Person value was incorrectly deserialized from Cassandra");
PersonId id = personMap2.keySet().iterator().next();
person = personCache2.get(id);
if (!person.equalsPrimitiveFields(personMap2.get(id)))
throw new RuntimeException("Person value was incorrectly deserialized from Cassandra");
person = personCache3.get(id);
if (!person.equals(personMap2.get(id)))
throw new RuntimeException("Person value was incorrectly deserialized from Cassandra");
person = personCache4.get(id);
if (!person.equals(personMap2.get(id)))
throw new RuntimeException("Person value was incorrectly deserialized from Cassandra");
Product product1 = productCache.get(product.getId());
if (!product.equals(product1))
throw new RuntimeException("Product value was incorrectly deserialized from Cassandra");
ProductOrder order1 = orderCache.get(order.getId());
if (!order.equals(order1))
throw new RuntimeException("Order value was incorrectly deserialized from Cassandra");
LOGGER.info("Single operation read tests passed");
LOGGER.info("Running bulk operation read tests");
Map<Long, Person> persons1 = personCache1.getAll(personMap1.keySet());
if (!TestsHelper.checkPersonMapsEqual(persons1, personMap1, true))
throw new RuntimeException("Persons values batch was incorrectly deserialized from Cassandra");
Map<PersonId, Person> persons2 = personCache2.getAll(personMap2.keySet());
if (!TestsHelper.checkPersonMapsEqual(persons2, personMap2, true))
throw new RuntimeException("Person values batch was incorrectly deserialized from Cassandra");
Map<PersonId, Person> persons3 = personCache3.getAll(personMap2.keySet());
if (!TestsHelper.checkPersonMapsEqual(persons3, personMap2, false))
throw new RuntimeException("Person values batch was incorrectly deserialized from Cassandra");
Map<PersonId, Person> persons4 = personCache4.getAll(personMap2.keySet());
if (!TestsHelper.checkPersonMapsEqual(persons4, personMap2, false))
throw new RuntimeException("Person values batch was incorrectly deserialized from Cassandra");
Map<Long, Product> productsMap1 = productCache.getAll(productsMap.keySet());
if (!TestsHelper.checkProductMapsEqual(productsMap, productsMap1))
throw new RuntimeException("Product values batch was incorrectly deserialized from Cassandra");
Map<Long, ProductOrder> ordersMap1 = orderCache.getAll(ordersMap.keySet());
if (!TestsHelper.checkOrderMapsEqual(ordersMap, ordersMap1))
throw new RuntimeException("Order values batch was incorrectly deserialized from Cassandra");
LOGGER.info("Bulk operation read tests passed");
LOGGER.info("POJO strategy read tests passed");
LOGGER.info("Running POJO strategy delete tests");
personCache1.remove(1L);
personCache1.removeAll(personMap1.keySet());
personCache2.remove(id);
personCache2.removeAll(personMap2.keySet());
personCache3.remove(id);
personCache3.removeAll(personMap2.keySet());
personCache4.remove(id);
personCache4.removeAll(personMap2.keySet());
productCache.remove(product.getId());
productCache.removeAll(productsMap.keySet());
orderCache.remove(order.getId());
orderCache.removeAll(ordersMap.keySet());
LOGGER.info("POJO strategy delete tests passed");
}
}
/** */
@Test
public void pojoStrategySimpleObjectsTest() {
Ignition.stopAll(true);
LOGGER.info("Running POJO strategy write tests for simple objects");
Map<SimplePersonId, SimplePerson> personMap5 = TestsHelper.generateSimplePersonIdsPersonsMap();
Map<SimplePersonId, SimplePerson> personMap6 = TestsHelper.generateSimplePersonIdsPersonsMap();
try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) {
IgniteCache<SimplePersonId, SimplePerson> personCache5 =
ignite.getOrCreateCache(new CacheConfiguration<SimplePersonId, SimplePerson>("cache5"));
IgniteCache<SimplePersonId, SimplePerson> personCache6 =
ignite.getOrCreateCache(new CacheConfiguration<SimplePersonId, SimplePerson>("cache6"));
LOGGER.info("Running single operation write tests");
SimplePersonId id = TestsHelper.generateRandomSimplePersonId();
personCache5.put(id, TestsHelper.generateRandomSimplePerson(id.personNum));
personCache6.put(id, TestsHelper.generateRandomSimplePerson(id.personNum));
LOGGER.info("Single operation write tests passed");
LOGGER.info("Running bulk operation write tests");
personCache5.putAll(personMap5);
personCache6.putAll(personMap6);
LOGGER.info("Bulk operation write tests passed");
}
LOGGER.info("POJO strategy write tests for simple objects passed");
Ignition.stopAll(true);
try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) {
LOGGER.info("Running POJO strategy read tests for simple objects");
IgniteCache<SimplePersonId, SimplePerson> personCache5 =
ignite.getOrCreateCache(new CacheConfiguration<SimplePersonId, SimplePerson>("cache5"));
IgniteCache<SimplePersonId, SimplePerson> personCache6 =
ignite.getOrCreateCache(new CacheConfiguration<SimplePersonId, SimplePerson>("cache6"));
LOGGER.info("Running single operation read tests");
SimplePersonId id = personMap5.keySet().iterator().next();
SimplePerson person = personCache5.get(id);
if (!person.equalsPrimitiveFields(personMap5.get(id)))
throw new RuntimeException("SimplePerson value was incorrectly deserialized from Cassandra");
id = personMap6.keySet().iterator().next();
person = personCache6.get(id);
if (!person.equals(personMap6.get(id)))
throw new RuntimeException("SimplePerson value was incorrectly deserialized from Cassandra");
LOGGER.info("Single operation read tests passed");
LOGGER.info("Running bulk operation read tests");
Map<SimplePersonId, SimplePerson> persons5 = personCache5.getAll(personMap5.keySet());
if (!TestsHelper.checkSimplePersonMapsEqual(persons5, personMap5, true))
throw new RuntimeException("SimplePerson values batch was incorrectly deserialized from Cassandra");
Map<SimplePersonId, SimplePerson> persons6 = personCache6.getAll(personMap6.keySet());
if (!TestsHelper.checkSimplePersonMapsEqual(persons6, personMap6, false))
throw new RuntimeException("SimplePerson values batch was incorrectly deserialized from Cassandra");
LOGGER.info("Bulk operation read tests passed");
LOGGER.info("POJO strategy read tests for simple objects passed");
LOGGER.info("Running POJO strategy delete tests for simple objects");
personCache5.remove(id);
personCache5.removeAll(personMap5.keySet());
personCache6.remove(id);
personCache6.removeAll(personMap6.keySet());
LOGGER.info("POJO strategy delete tests for simple objects passed");
}
}
/** */
@Test
public void pojoStrategyTransactionTest() {
CassandraHelper.dropTestKeyspaces();
Ignition.stopAll(true);
try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) {
pojoStrategyTransactionTest(ignite, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
pojoStrategyTransactionTest(ignite, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
pojoStrategyTransactionTest(ignite, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
pojoStrategyTransactionTest(ignite, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
pojoStrategyTransactionTest(ignite, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
pojoStrategyTransactionTest(ignite, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
}
}
/** */
@Test
public void loadCacheTest() {
Ignition.stopAll(true);
LOGGER.info("Running loadCache test");
LOGGER.info("Filling Cassandra table with test data");
CacheStore store = CacheStoreHelper.createCacheStore("personTypes",
new ClassPathResource("org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml"),
CassandraHelper.getAdminDataSrc());
Collection<CacheEntryImpl<PersonId, Person>> entries = TestsHelper.generatePersonIdsPersonsEntries();
//noinspection unchecked
store.writeAll(entries);
LOGGER.info("Cassandra table filled with test data");
LOGGER.info("Running loadCache test");
try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) {
CacheConfiguration<PersonId, Person> ccfg = new CacheConfiguration<>("cache3");
IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(ccfg);
int size = personCache3.size(CachePeekMode.ALL);
LOGGER.info("Initial cache size " + size);
LOGGER.info("Loading cache data from Cassandra table");
String qry = "select * from test1.pojo_test3 limit 3";
personCache3.loadCache(null, qry);
size = personCache3.size(CachePeekMode.ALL);
Assert.assertEquals("Cache data was incorrectly loaded from Cassandra table by '" + qry + "'", 3, size);
personCache3.clear();
personCache3.loadCache(null, new SimpleStatement(qry));
size = personCache3.size(CachePeekMode.ALL);
Assert.assertEquals("Cache data was incorrectly loaded from Cassandra table by statement", 3, size);
personCache3.clear();
personCache3.loadCache(null);
size = personCache3.size(CachePeekMode.ALL);
Assert.assertEquals("Cache data was incorrectly loaded from Cassandra. " +
"Expected number of records is " + TestsHelper.getBulkOperationSize() +
", but loaded number of records is " + size,
TestsHelper.getBulkOperationSize(), size);
LOGGER.info("Cache data loaded from Cassandra table");
}
LOGGER.info("loadCache test passed");
}
/** */
@SuppressWarnings("unchecked")
private void pojoStrategyTransactionTest(Ignite ignite, TransactionConcurrency concurrency,
TransactionIsolation isolation) {
LOGGER.info("-----------------------------------------------------------------------------------");
LOGGER.info("Running POJO transaction tests using " + concurrency +
" concurrency and " + isolation + " isolation level");
LOGGER.info("-----------------------------------------------------------------------------------");
CacheStore productStore = CacheStoreHelper.createCacheStore("product",
new ClassPathResource("org/apache/ignite/tests/persistence/pojo/product.xml"),
CassandraHelper.getAdminDataSrc());
CacheStore orderStore = CacheStoreHelper.createCacheStore("order",
new ClassPathResource("org/apache/ignite/tests/persistence/pojo/order.xml"),
CassandraHelper.getAdminDataSrc());
Map<Long, Product> productsMap = TestsHelper.generateProductsMap(5);
Map<Long, Product> productsMap1;
Map<Long, ProductOrder> ordersMap = TestsHelper.generateOrdersMap(5);
Map<Long, ProductOrder> ordersMap1;
Product product = TestsHelper.generateRandomProduct(-1L);
ProductOrder order = TestsHelper.generateRandomOrder(-1L, -1L, Instant.now());
IgniteTransactions txs = ignite.transactions();
IgniteCache<Long, Product> productCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Product>("product"));
IgniteCache<Long, ProductOrder> orderCache = ignite.getOrCreateCache(new CacheConfiguration<Long, ProductOrder>("order"));
LOGGER.info("Running POJO strategy write tests");
LOGGER.info("Running single operation write tests");
Transaction tx = txs.txStart(concurrency, isolation);
try {
productCache.put(product.getId(), product);
orderCache.put(order.getId(), order);
if (productStore.load(product.getId()) != null || orderStore.load(order.getId()) != null) {
throw new RuntimeException("Single write operation test failed. Transaction wasn't committed yet, but " +
"objects were already persisted into Cassandra");
}
Map<Long, Product> products = (Map<Long, Product>)productStore.loadAll(productsMap.keySet());
Map<Long, ProductOrder> orders = (Map<Long, ProductOrder>)orderStore.loadAll(ordersMap.keySet());
if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) {
throw new RuntimeException("Single write operation test failed. Transaction wasn't committed yet, but " +
"objects were already persisted into Cassandra");
}
tx.commit();
}
finally {
U.closeQuiet(tx);
}
Product product1 = (Product)productStore.load(product.getId());
ProductOrder order1 = (ProductOrder)orderStore.load(order.getId());
if (product1 == null || order1 == null) {
throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
"no objects were persisted into Cassandra");
}
if (!product.equals(product1) || !order.equals(order1)) {
throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
"objects were incorrectly persisted/loaded to/from Cassandra");
}
LOGGER.info("Single operation write tests passed");
LOGGER.info("Running bulk operation write tests");
tx = txs.txStart(concurrency, isolation);
try {
productCache.putAll(productsMap);
orderCache.putAll(ordersMap);
productsMap1 = (Map<Long, Product>)productStore.loadAll(productsMap.keySet());
ordersMap1 = (Map<Long, ProductOrder>)orderStore.loadAll(ordersMap.keySet());
if ((productsMap1 != null && !productsMap1.isEmpty()) || (ordersMap1 != null && !ordersMap1.isEmpty())) {
throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " +
"objects were already persisted into Cassandra");
}
tx.commit();
}
finally {
U.closeQuiet(tx);
}
productsMap1 = (Map<Long, Product>)productStore.loadAll(productsMap.keySet());
ordersMap1 = (Map<Long, ProductOrder>)orderStore.loadAll(ordersMap.keySet());
if (productsMap1 == null || productsMap1.isEmpty() || ordersMap1 == null || ordersMap1.isEmpty()) {
throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
"no objects were persisted into Cassandra");
}
if (productsMap1.size() < productsMap.size() || ordersMap1.size() < ordersMap.size()) {
throw new RuntimeException("Bulk write operation test failed. There were committed less objects " +
"into Cassandra than expected");
}
if (productsMap1.size() > productsMap.size() || ordersMap1.size() > ordersMap.size()) {
throw new RuntimeException("Bulk write operation test failed. There were committed more objects " +
"into Cassandra than expected");
}
for (Map.Entry<Long, Product> entry : productsMap.entrySet()) {
product = productsMap1.get(entry.getKey());
if (!entry.getValue().equals(product)) {
throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
"some objects were incorrectly persisted/loaded to/from Cassandra");
}
}
for (Map.Entry<Long, ProductOrder> entry : ordersMap.entrySet()) {
order = ordersMap1.get(entry.getKey());
if (!entry.getValue().equals(order)) {
throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
"some objects were incorrectly persisted/loaded to/from Cassandra");
}
}
LOGGER.info("Bulk operation write tests passed");
LOGGER.info("POJO strategy write tests passed");
LOGGER.info("Running POJO strategy delete tests");
LOGGER.info("Running single delete tests");
tx = txs.txStart(concurrency, isolation);
try {
productCache.remove(-1L);
orderCache.remove(-1L);
if (productStore.load(-1L) == null || orderStore.load(-1L) == null) {
throw new RuntimeException("Single delete operation test failed. Transaction wasn't committed yet, but " +
"objects were already deleted from Cassandra");
}
tx.commit();
}
finally {
U.closeQuiet(tx);
}
if (productStore.load(-1L) != null || orderStore.load(-1L) != null) {
throw new RuntimeException("Single delete operation test failed. Transaction was committed, but " +
"objects were not deleted from Cassandra");
}
LOGGER.info("Single delete tests passed");
LOGGER.info("Running bulk delete tests");
tx = txs.txStart(concurrency, isolation);
try {
productCache.removeAll(productsMap.keySet());
orderCache.removeAll(ordersMap.keySet());
productsMap1 = (Map<Long, Product>)productStore.loadAll(productsMap.keySet());
ordersMap1 = (Map<Long, ProductOrder>)orderStore.loadAll(ordersMap.keySet());
if (productsMap1.size() != productsMap.size() || ordersMap1.size() != ordersMap.size()) {
throw new RuntimeException("Bulk delete operation test failed. Transaction wasn't committed yet, but " +
"objects were already deleted from Cassandra");
}
tx.commit();
}
finally {
U.closeQuiet(tx);
}
productsMap1 = (Map<Long, Product>)productStore.loadAll(productsMap.keySet());
ordersMap1 = (Map<Long, ProductOrder>)orderStore.loadAll(ordersMap.keySet());
if ((productsMap1 != null && !productsMap1.isEmpty()) || (ordersMap1 != null && !ordersMap1.isEmpty())) {
throw new RuntimeException("Bulk delete operation test failed. Transaction was committed, but " +
"objects were not deleted from Cassandra");
}
LOGGER.info("Bulk delete tests passed");
LOGGER.info("POJO strategy delete tests passed");
LOGGER.info("-----------------------------------------------------------------------------------");
LOGGER.info("Passed POJO transaction tests for " + concurrency +
" concurrency and " + isolation + " isolation level");
LOGGER.info("-----------------------------------------------------------------------------------");
}
/*
* KeyValuePersistenceSettings is passed directly, not as a bean and should be
* serialized and deserialized correctly
*/
@Test
public void directPersistenceConfigTest() throws IOException {
Map<PersonId, Person> personMap = TestsHelper.generatePersonIdsPersonsMap();
PersonId id = TestsHelper.generateRandomPersonId();
Person person = TestsHelper.generateRandomPerson(id.getPersonNumber());
IgniteConfiguration config = igniteConfig();
Ignition.stopAll(true);
try (Ignite ignite = Ignition.start(config)) {
LOGGER.info("Running POJO strategy write tests");
IgniteCache<PersonId, Person> cache = ignite.getOrCreateCache("cache1");
LOGGER.info("Running single operation write tests");
cache.put(id, TestsHelper.generateRandomPerson(id.getPersonNumber()));
cache.put(id, person);
LOGGER.info("Single operation write tests passed");
LOGGER.info("Running bulk operation write tests");
cache.putAll(personMap);
LOGGER.info("Bulk operation write tests passed");
}
LOGGER.info("POJO strategy write tests passed");
Ignition.stopAll(true);
try (Ignite ignite = Ignition.start(config)) {
LOGGER.info("Running POJO strategy read tests");
IgniteCache<PersonId, Person> cache = ignite.getOrCreateCache("cache1");
Person actualPerson = cache.get(id);
if (!person.equals(actualPerson))
throw new RuntimeException("Person value was incorrectly deserialized from Cassandra");
LOGGER.info("Single operation read tests passed");
LOGGER.info("Running bulk operation read tests");
Map<PersonId, Person> actualPersonMap = cache.getAll(personMap.keySet());
if (!TestsHelper.checkPersonMapsEqual(actualPersonMap, personMap, true))
throw new RuntimeException("Person values batch was incorrectly deserialized from Cassandra");
LOGGER.info("Bulk operation read tests passed");
LOGGER.info("POJO strategy read tests passed");
LOGGER.info("Running POJO strategy delete tests");
cache.remove(id);
cache.removeAll(personMap.keySet());
LOGGER.info("POJO strategy delete tests passed");
}
}
private IgniteConfiguration igniteConfig() throws IOException {
URL url = getClass().getClassLoader().getResource("org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml");
String persistence = U.readFileToString(url.getFile(), "UTF-8");
KeyValuePersistenceSettings persistenceSettings = new KeyValuePersistenceSettings(persistence);
DataSource dataSource = new DataSource();
dataSource.setContactPoints(CassandraHelper.getContactPointsArray());
dataSource.setCredentials(new CassandraAdminCredentials());
dataSource.setLoadBalancingPolicy(new RoundRobinPolicy());
CassandraCacheStoreFactory<String, Person> storeFactory = new CassandraCacheStoreFactory<>();
storeFactory.setDataSource(dataSource);
storeFactory.setPersistenceSettings(persistenceSettings);
CacheConfiguration<String, Person> cacheConfiguration = new CacheConfiguration<>();
cacheConfiguration.setName("cache1");
cacheConfiguration.setReadThrough(true);
cacheConfiguration.setWriteThrough(true);
cacheConfiguration.setCacheStoreFactory(storeFactory);
IgniteConfiguration config = new IgniteConfiguration();
config.setCacheConfiguration(cacheConfiguration);
return config;
}
/** */
public static class PojoPerson {
/** */
private int id;
/** */
private String name;
/** */
public PojoPerson() {
// No-op.
}
/** */
public PojoPerson(int id, String name) {
this.id = id;
this.name = name;
}
/** */
public int getId() {
return id;
}
/** */
public String getName() {
return name;
}
}
}