blob: 9abd807a5c5af8181c10d28ff3d8770c2c440889 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/** */
public class IgniteSqlNotNullConstraintTest extends AbstractIndexingCommonTest {
/** Name of client node. */
private static final String NODE_CLIENT = "client";
/** Number of server nodes. */
private static final int NODE_COUNT = 2;
/** Cache prefix. */
private static final String CACHE_PREFIX = "person";
/** Transactional person cache. */
private static final String CACHE_PERSON = "person-PARTITIONED-TRANSACTIONAL";
/** Name of SQL table. */
private static final String TABLE_PERSON = "\"" + CACHE_PERSON + "\".\"PERSON\"";
/** Template of cache with read-through setting. */
private static final String CACHE_READ_THROUGH = "cacheReadThrough";
/** Template of cache with interceptor setting. */
private static final String CACHE_INTERCEPTOR = "cacheInterceptor";
/** Expected error message. */
private static final String ERR_MSG = "Null value is not allowed for column 'NAME'";
/** Expected error message for read-through restriction. */
private static final String READ_THROUGH_ERR_MSG = "NOT NULL constraint is not supported when " +
"CacheConfiguration.readThrough is enabled.";
/** Expected error message for cache interceptor restriction. */
private static final String INTERCEPTOR_ERR_MSG = "NOT NULL constraint is not supported when " +
"CacheConfiguration.interceptor is set.";
/** Name of the node which configuration includes restricted cache config. */
private static final String READ_THROUGH_CFG_NODE_NAME = "nodeCacheReadThrough";
/** Name of the node which configuration includes restricted cache config. */
private static final String INTERCEPTOR_CFG_NODE_NAME = "nodeCacheInterceptor";
/** OK value. */
private final Person okValue = new Person("Name", 18);
/** Bad value, violating constraint. */
private final Person badValue = new Person(null, 25);
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration c = super.getConfiguration(gridName);
List<CacheConfiguration> ccfgs = new ArrayList<>();
ccfgs.addAll(cacheConfigurations());
if (gridName.equals(READ_THROUGH_CFG_NODE_NAME))
ccfgs.add(buildCacheConfigurationRestricted("BadCfgTestCacheRT", true, false, true));
if (gridName.equals(INTERCEPTOR_CFG_NODE_NAME))
ccfgs.add(buildCacheConfigurationRestricted("BadCfgTestCacheINT", false, true, true));
c.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
if (gridName.equals(NODE_CLIENT)) {
// Not allowed to have local cache on client without memory config
c.setDataStorageConfiguration(new DataStorageConfiguration());
}
return c;
}
/** */
private List<CacheConfiguration> cacheConfigurations() {
List<CacheConfiguration> res = new ArrayList<>();
for (boolean wrt : new boolean[] {false, true}) {
for (boolean annot : new boolean[] {false, true}) {
res.add(buildCacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, false, wrt, annot));
res.add(buildCacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, false, wrt, annot));
res.add(buildCacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, false, wrt, annot));
res.add(buildCacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, true, wrt, annot));
res.add(buildCacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, false, wrt, annot));
res.add(buildCacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, true, wrt, annot));
}
}
return res;
}
/** */
private CacheConfiguration buildCacheConfiguration(CacheMode mode,
CacheAtomicityMode atomicityMode, boolean hasNear, boolean writeThrough, boolean notNullAnnotated) {
CacheConfiguration cfg = new CacheConfiguration(CACHE_PREFIX + "-" +
mode.name() + "-" + atomicityMode.name() + (hasNear ? "-near" : "") +
(writeThrough ? "-writethrough" : "") + (notNullAnnotated ? "-annot" : ""));
cfg.setCacheMode(mode);
cfg.setAtomicityMode(atomicityMode);
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
QueryEntity qe = new QueryEntity(new QueryEntity(Integer.class, Person.class));
if (!notNullAnnotated)
qe.setNotNullFields(Collections.singleton("name"));
cfg.setQueryEntities(F.asList(qe));
if (hasNear)
cfg.setNearConfiguration(new NearCacheConfiguration().setNearStartSize(100));
if (writeThrough) {
cfg.setCacheStoreFactory(singletonFactory(new TestStore()));
cfg.setWriteThrough(true);
}
return cfg;
}
/** */
private CacheConfiguration buildCacheConfigurationRestricted(String cacheName, boolean readThrough,
boolean interceptor, boolean hasQueryEntity) {
CacheConfiguration cfg = new CacheConfiguration<Integer, Person>()
.setName(cacheName)
.setCacheMode(CacheMode.PARTITIONED)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
if (readThrough) {
cfg.setCacheStoreFactory(singletonFactory(new TestStore()));
cfg.setReadThrough(true);
}
if (interceptor)
cfg.setInterceptor(new TestInterceptor());
if (hasQueryEntity) {
cfg.setQueryEntities(F.asList(new QueryEntity(Integer.class, Person.class)
.setNotNullFields(Collections.singleton("name"))));
}
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGrids(NODE_COUNT);
startClientGrid(NODE_CLIENT);
// Add cache template with read-through cache store.
grid(NODE_CLIENT).addCacheConfiguration(
buildCacheConfigurationRestricted(CACHE_READ_THROUGH, true, false, false));
// Add cache template with cache interceptor.
grid(NODE_CLIENT).addCacheConfiguration(
buildCacheConfigurationRestricted(CACHE_INTERCEPTOR, false, true, false));
awaitPartitionMapExchange();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
cleanup();
}
/**
* We need to keep serialized cache store factories to share them between tests.
*/
@Override protected boolean keepSerializedObjects() {
return true;
}
/** */
@Test
public void testQueryEntityGetSetNotNullFields() throws Exception {
QueryEntity qe = new QueryEntity();
assertNull(qe.getNotNullFields());
Set<String> val = Collections.singleton("test");
qe.setNotNullFields(val);
assertEquals(val, Collections.singleton("test"));
qe.setNotNullFields(null);
assertNull(qe.getNotNullFields());
}
/** */
@Test
public void testQueryEntityEquals() throws Exception {
QueryEntity a = new QueryEntity();
QueryEntity b = new QueryEntity();
assertEquals(a, b);
a.setNotNullFields(Collections.singleton("test"));
assertFalse(a.equals(b));
b.setNotNullFields(Collections.singleton("test"));
assertTrue(a.equals(b));
}
/** */
@Test
public void testAtomicOrImplicitTxPut() throws Exception {
executeWithAllCaches(new TestClosure() {
@Override public void run() throws Exception {
cache.put(key1, okValue);
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
cache.put(key2, badValue);
return null;
}
}, IgniteCheckedException.class, ERR_MSG);
}
});
}
/** */
@Test
public void testAtomicOrImplicitTxPutIfAbsent() throws Exception {
executeWithAllCaches(new TestClosure() {
@Override public void run() throws Exception {
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
cache.putIfAbsent(key1, badValue);
return null;
}
}, IgniteCheckedException.class, ERR_MSG);
assertEquals(0, cache.size());
}
});
}
/** */
@Test
public void testAtomicOrImplicitTxGetAndPut() throws Exception {
executeWithAllCaches(new TestClosure() {
@Override public void run() throws Exception {
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
cache.getAndPut(key1, badValue);
return null;
}
}, IgniteCheckedException.class, ERR_MSG);
assertEquals(0, cache.size());
}
});
}
/** */
@Test
public void testAtomicOrImplicitTxGetAndPutIfAbsent() throws Exception {
executeWithAllCaches(new TestClosure() {
@Override public void run() throws Exception {
assertEquals(0, cache.size());
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return cache.getAndPutIfAbsent(key1, badValue);
}
}, IgniteCheckedException.class, ERR_MSG);
assertEquals(0, cache.size());
}
});
}
/** */
@Test
public void testAtomicOrImplicitTxReplace() throws Exception {
executeWithAllCaches(new TestClosure() {
@Override public void run() throws Exception {
cache.put(key1, okValue);
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return cache.replace(key1, badValue);
}
}, IgniteCheckedException.class, ERR_MSG);
assertEquals(1, cache.size());
assertEquals(okValue, cache.get(key1));
}
});
}
/** */
@Test
public void testAtomicOrImplicitTxGetAndReplace() throws Exception {
executeWithAllCaches(new TestClosure() {
@Override public void run() throws Exception {
cache.put(key1, okValue);
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return cache.getAndReplace(key1, badValue);
}
}, IgniteCheckedException.class, ERR_MSG);
assertEquals(1, cache.size());
assertEquals(okValue, cache.get(key1));
}
});
}
/** */
@Test
public void testAtomicOrImplicitTxPutAll() throws Exception {
doAtomicOrImplicitTxPutAll(F.asMap(1, okValue, 5, badValue), 1);
}
/** */
@Test
public void testAtomicOrImplicitTxPutAllForSingleValue() throws Exception {
doAtomicOrImplicitTxPutAll(F.asMap(5, badValue), 0);
}
/** */
@Test
public void testAtomicOrImplicitTxInvoke() throws Exception {
executeWithAllCaches(new TestClosure() {
@Override public void run() throws Exception {
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return cache.invoke(key1, new TestEntryProcessor(badValue));
}
}, IgniteCheckedException.class, ERR_MSG);
assertEquals(0, cache.size());
}
});
}
/** */
@Test
public void testAtomicOrImplicitTxInvokeAll() throws Exception {
executeWithAllCaches(new TestClosure() {
@Override public void run() throws Exception {
final Map<Integer, EntryProcessorResult<Object>> r = cache.invokeAll(F.asMap(
key1, new TestEntryProcessor(okValue),
key2, new TestEntryProcessor(badValue)));
assertNotNull(r);
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return r.get(key2).get();
}
}, IgniteCheckedException.class, ERR_MSG);
assertEquals(1, cache.size());
}
});
}
/** */
@Test
public void testTxPutCreate() throws Exception {
executeWithAllTxCaches(new TestClosure() {
@Override public void run() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
cache.put(key1, okValue);
cache.put(key2, badValue);
tx.commit();
}
assertEquals(0, cache.size());
return null;
}
}, CacheException.class, ERR_MSG);
}
});
}
/** */
@Test
public void testTxPutUpdate() throws Exception {
executeWithAllTxCaches(new TestClosure() {
@Override public void run() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
cache.put(key1, okValue);
cache.put(key2, okValue);
cache.put(key2, badValue);
tx.commit();
}
assertEquals(0, cache.size());
return null;
}
}, CacheException.class, ERR_MSG);
}
});
}
/** */
@Test
public void testTxPutIfAbsent() throws Exception {
executeWithAllTxCaches(new TestClosure() {
@Override public void run() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
cache.putIfAbsent(key1, badValue);
tx.commit();
}
assertEquals(0, cache.size());
return null;
}
}, CacheException.class, ERR_MSG);
}
});
}
/** */
@Test
public void testTxGetAndPut() throws Exception {
executeWithAllTxCaches(new TestClosure() {
@Override public void run() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
cache.getAndPut(key1, badValue);
tx.commit();
}
assertEquals(0, cache.size());
return null;
}
}, CacheException.class, ERR_MSG);
}
});
}
/** */
@Test
public void testTxGetAndPutIfAbsent() throws Exception {
executeWithAllTxCaches(new TestClosure() {
@Override public void run() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
cache.getAndPutIfAbsent(key1, badValue);
tx.commit();
}
assertEquals(0, cache.size());
return null;
}
}, CacheException.class, ERR_MSG);
}
});
}
/** */
@Test
public void testTxReplace() throws Exception {
executeWithAllTxCaches(new TestClosure() {
@Override public void run() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
cache.put(1, okValue);
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
cache.replace(key1, badValue);
tx.commit();
}
assertEquals(1, cache.size());
assertEquals(okValue, cache.get(key1));
return null;
}
}, CacheException.class, ERR_MSG);
}
});
}
/** */
@Test
public void testTxGetAndReplace() throws Exception {
executeWithAllTxCaches(new TestClosure() {
@Override public void run() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
cache.put(key1, okValue);
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
cache.getAndReplace(key1, badValue);
tx.commit();
}
assertEquals(1, cache.size());
assertEquals(okValue, cache.get(key1));
return null;
}
}, CacheException.class, ERR_MSG);
}
});
}
/** */
@Test
public void testTxPutAll() throws Exception {
doTxPutAll(F.asMap(1, okValue, 5, badValue));
}
/** */
@Test
public void testTxPutAllForSingleValue() throws Exception {
doTxPutAll(F.asMap(5, badValue));
}
/** */
@Test
public void testTxInvoke() throws Exception {
executeWithAllTxCaches(new TestClosure() {
@Override public void run() throws Exception {
cache.put(key1, okValue);
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
cache.invoke(key1, new TestEntryProcessor(badValue));
tx.commit();
}
return null;
}
}, EntryProcessorException.class, ERR_MSG);
assertEquals(1, cache.size());
assertEquals(okValue, cache.get(key1));
}
});
}
/** */
@Test
public void testTxInvokeAll() throws Exception {
executeWithAllTxCaches(new TestClosure() {
@Override public void run() throws Exception {
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
final Map<Integer, EntryProcessorResult<Object>> r = cache.invokeAll(F.asMap(
key1, new TestEntryProcessor(okValue),
key2, new TestEntryProcessor(badValue)));
assertNotNull(r);
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return r.get(key2).get();
}
}, EntryProcessorException.class, ERR_MSG);
tx.rollback();
}
assertEquals(0, cache.size());
}
});
}
/** */
@Test
public void testAtomicOrImplicitTxInvokeDelete() throws Exception {
executeWithAllCaches(new TestClosure() {
@Override public void run() throws Exception {
cache.put(key1, okValue);
cache.invoke(key1, new TestEntryProcessor(null));
assertEquals(0, cache.size());
}
});
}
/** */
@Test
public void testAtomicOrImplicitTxInvokeAllDelete() throws Exception {
executeWithAllCaches(new TestClosure() {
@Override public void run() throws Exception {
cache.put(key1, okValue);
cache.put(key2, okValue);
cache.invokeAll(F.asMap(
key1, new TestEntryProcessor(null),
key2, new TestEntryProcessor(null)));
assertEquals(0, cache.size());
}
});
}
/** */
@Test
public void testTxInvokeDelete() throws Exception {
executeWithAllTxCaches(new TestClosure() {
@Override public void run() throws Exception {
cache.put(key1, okValue);
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
cache.invoke(key1, new TestEntryProcessor(null));
tx.commit();
}
assertEquals(0, cache.size());
}
});
}
/** */
@Test
public void testTxInvokeAllDelete() throws Exception {
executeWithAllTxCaches(new TestClosure() {
@Override public void run() throws Exception {
cache.put(key1, okValue);
cache.put(key2, okValue);
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
cache.invokeAll(F.asMap(
key1, new TestEntryProcessor(null),
key2, new TestEntryProcessor(null)));
tx.commit();
}
assertEquals(0, cache.size());
}
});
}
/** */
@Test
public void testDynamicTableCreateNotNullFieldsAllowed() throws Exception {
executeSql("CREATE TABLE test(id INT PRIMARY KEY, field INT NOT NULL)");
String cacheName = QueryUtils.createTableCacheName("PUBLIC", "TEST");
IgniteEx client = grid(NODE_CLIENT);
CacheConfiguration ccfg = client.context().cache().cache(cacheName).configuration();
QueryEntity qe = (QueryEntity)F.first(ccfg.getQueryEntities());
assertEquals(Collections.singleton("FIELD"), qe.getNotNullFields());
checkState("PUBLIC", "TEST", "FIELD");
}
/** */
@Test
public void testAlterTableAddColumnNotNullFieldAllowed() throws Exception {
executeSql("CREATE TABLE test(id INT PRIMARY KEY, age INT)");
executeSql("ALTER TABLE test ADD COLUMN name CHAR NOT NULL");
checkState("PUBLIC", "TEST", "NAME");
}
/** */
@Test
public void testAtomicNotNullCheckDmlInsertValues() throws Exception {
checkNotNullCheckDmlInsertValues(CacheAtomicityMode.ATOMIC);
}
/** */
@Test
public void testTransactionalNotNullCheckDmlInsertValues() throws Exception {
checkNotNullCheckDmlInsertValues(CacheAtomicityMode.TRANSACTIONAL);
}
/** */
private void checkNotNullCheckDmlInsertValues(CacheAtomicityMode atomicityMode) throws Exception {
executeSql("CREATE TABLE test(id INT PRIMARY KEY, name VARCHAR NOT NULL, age INT) WITH \"atomicity="
+ atomicityMode.name() + "\"");
checkNotNullInsertValues();
}
/** */
@Test
public void testAtomicAddColumnNotNullCheckDmlInsertValues() throws Exception {
checkAddColumnNotNullCheckDmlInsertValues(CacheAtomicityMode.ATOMIC);
}
/** */
@Test
public void testTransactionalAddColumnNotNullCheckDmlInsertValues() throws Exception {
checkAddColumnNotNullCheckDmlInsertValues(CacheAtomicityMode.TRANSACTIONAL);
}
/** */
private void checkAddColumnNotNullCheckDmlInsertValues(CacheAtomicityMode atomicityMode) throws Exception {
executeSql("CREATE TABLE test(id INT PRIMARY KEY, age INT) WITH \"atomicity="
+ atomicityMode.name() + "\"");
executeSql("ALTER TABLE test ADD COLUMN name VARCHAR NOT NULL");
checkNotNullInsertValues();
}
/**
* @throws Exception If failed.
*/
private void checkNotNullInsertValues() throws Exception {
GridTestUtils.assertThrows(log(), new Callable<Object>() {
@Override public Object call() throws Exception {
executeSql("INSERT INTO test(id, name, age) VALUES (2, NULLIF('a', 'a'), 2)");
return null;
}
}, IgniteSQLException.class, ERR_MSG);
List<List<?>> result = executeSql("SELECT id, name, age FROM test ORDER BY id");
assertEquals(0, result.size());
executeSql("INSERT INTO test(id, name) VALUES (1, 'ok'), (2, 'ok2'), (3, 'ok3')");
result = executeSql("SELECT id, name FROM test ORDER BY id");
assertEquals(3, result.size());
}
/** */
@Test
public void testNotNullCheckDmlInsertFromSelect() throws Exception {
executeSql("CREATE TABLE test(id INT PRIMARY KEY, name VARCHAR, age INT)");
executeSql("INSERT INTO test(id, name, age) VALUES (2, null, 25)");
GridTestUtils.assertThrows(log(), new Callable<Object>() {
@Override public Object call() throws Exception {
return executeSql("INSERT INTO " + TABLE_PERSON +
"(_key, name, age) " +
"SELECT id, name, age FROM test");
}
}, IgniteSQLException.class, ERR_MSG);
List<List<?>> result = executeSql("SELECT _key, name FROM " + TABLE_PERSON + " ORDER BY _key");
assertEquals(0, result.size());
executeSql("INSERT INTO test(id, name, age) VALUES (1, 'Macy', 25), (3, 'John', 30)");
executeSql("DELETE FROM test WHERE id = 2");
result = executeSql("INSERT INTO " + TABLE_PERSON + "(_key, name, age) " + "SELECT id, name, age FROM test");
assertEquals(2L, result.get(0).get(0));
}
/** */
@Test
public void testNotNullCheckDmlUpdateValues() throws Exception {
executeSql("CREATE TABLE test(id INT PRIMARY KEY, name VARCHAR NOT NULL)");
executeSql("INSERT INTO test(id, name) VALUES (1, 'John')");
GridTestUtils.assertThrows(log(), new Callable<Object>() {
@Override public Object call() throws Exception {
return executeSql("UPDATE test SET name = NULLIF(id, 1) WHERE id = 1");
}
}, IgniteSQLException.class, ERR_MSG);
List<List<?>> result = executeSql("SELECT id, name FROM test");
assertEquals(1, result.size());
assertEquals(1, result.get(0).get(0));
assertEquals("John", result.get(0).get(1));
executeSql("UPDATE test SET name = 'James' WHERE id = 1");
result = executeSql("SELECT id, name FROM test");
assertEquals(1, result.get(0).get(0));
assertEquals("James", result.get(0).get(1));
}
/** */
@Test
public void testNotNullCheckDmlUpdateFromSelect() throws Exception {
executeSql("CREATE TABLE src(id INT PRIMARY KEY, name VARCHAR)");
executeSql("CREATE TABLE dest(id INT PRIMARY KEY, name VARCHAR NOT NULL)");
executeSql("INSERT INTO dest(id, name) VALUES (1, 'William'), (2, 'Warren'), (3, 'Robert')");
executeSql("INSERT INTO src(id, name) VALUES (1, 'Bill'), (2, null), (3, 'Bob')");
GridTestUtils.assertThrows(log(), new Callable<Object>() {
@Override public Object call() throws Exception {
return executeSql("UPDATE dest p " +
"SET (name) = (SELECT name FROM src t WHERE p.id = t.id) " +
"WHERE p.id = 2");
}
}, IgniteSQLException.class, ERR_MSG);
List<List<?>> result = executeSql("SELECT id, name FROM dest ORDER BY id");
assertEquals(3, result.size());
assertEquals(1, result.get(0).get(0));
assertEquals("William", result.get(0).get(1));
assertEquals(2, result.get(1).get(0));
assertEquals("Warren", result.get(1).get(1));
assertEquals(3, result.get(2).get(0));
assertEquals("Robert", result.get(2).get(1));
executeSql("UPDATE src SET name = 'Ren' WHERE id = 2");
executeSql("UPDATE dest p SET (name) = (SELECT name FROM src t WHERE p.id = t.id)");
result = executeSql("SELECT id, name FROM dest ORDER BY id");
assertEquals(3, result.size());
assertEquals(1, result.get(0).get(0));
assertEquals("Bill", result.get(0).get(1));
assertEquals(2, result.get(1).get(0));
assertEquals("Ren", result.get(1).get(1));
assertEquals(3, result.get(2).get(0));
assertEquals("Bob", result.get(2).get(1));
}
/** Check QueryEntity configuration fails with NOT NULL field and read-through. */
@Test
public void testReadThroughRestrictionQueryEntity() throws Exception {
// Node start-up failure (read-through cache store).
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return startClientGrid(READ_THROUGH_CFG_NODE_NAME);
}
}, IgniteCheckedException.class, READ_THROUGH_ERR_MSG);
// Dynamic cache start-up failure (read-through cache store)
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return grid(NODE_CLIENT).createCache(
buildCacheConfigurationRestricted("dynBadCfgCacheRT", true, false, true));
}
}, IgniteCheckedException.class, READ_THROUGH_ERR_MSG);
}
/** Check QueryEntity configuration fails with NOT NULL field and cache interceptor. */
@Test
public void testInterceptorRestrictionQueryEntity() throws Exception {
// Node start-up failure (interceptor).
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return startClientGrid(INTERCEPTOR_CFG_NODE_NAME);
}
}, IgniteCheckedException.class, INTERCEPTOR_ERR_MSG);
// Dynamic cache start-up failure (interceptor)
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return grid(NODE_CLIENT).createCache(
buildCacheConfigurationRestricted("dynBadCfgCacheINT", false, true, true));
}
}, IgniteCheckedException.class, INTERCEPTOR_ERR_MSG);
}
/** Check create table fails with NOT NULL field and read-through. */
@Test
public void testReadThroughRestrictionCreateTable() throws Exception {
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return executeSql("CREATE TABLE test(id INT PRIMARY KEY, name char NOT NULL) " +
"WITH \"template=" + CACHE_READ_THROUGH + "\"");
}
}, IgniteSQLException.class, READ_THROUGH_ERR_MSG);
}
/** Check create table fails with NOT NULL field and cache interceptor. */
@Test
public void testInterceptorRestrictionCreateTable() throws Exception {
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return executeSql("CREATE TABLE test(id INT PRIMARY KEY, name char NOT NULL) " +
"WITH \"template=" + CACHE_INTERCEPTOR + "\"");
}
}, IgniteSQLException.class, INTERCEPTOR_ERR_MSG);
}
/** Check alter table fails with NOT NULL field and read-through. */
@Test
public void testReadThroughRestrictionAlterTable() throws Exception {
executeSql("CREATE TABLE test(id INT PRIMARY KEY, age INT) " +
"WITH \"template=" + CACHE_READ_THROUGH + "\"");
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return executeSql("ALTER TABLE test ADD COLUMN name char NOT NULL");
}
}, IgniteSQLException.class, READ_THROUGH_ERR_MSG);
}
/** Check alter table fails with NOT NULL field and cache interceptor. */
@Test
public void testInterceptorRestrictionAlterTable() throws Exception {
executeSql("CREATE TABLE test(id INT PRIMARY KEY, age INT) " +
"WITH \"template=" + CACHE_INTERCEPTOR + "\"");
GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() {
@Override public Object call() throws Exception {
return executeSql("ALTER TABLE test ADD COLUMN name char NOT NULL");
}
}, IgniteSQLException.class, INTERCEPTOR_ERR_MSG);
}
/** */
private void doAtomicOrImplicitTxPutAll(final Map<Integer, Person> values, int expAtomicCacheSize) throws Exception {
executeWithAllCaches(new TestClosure() {
@Override public void run() throws Exception {
Throwable t = GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
@Override public Object call() throws Exception {
cache.putAll(values);
return null;
}
}, IgniteSQLException.class);
IgniteSQLException ex = X.cause(t, IgniteSQLException.class);
assertNotNull(ex);
assertTrue(ex.getMessage().contains(ERR_MSG));
assertEquals(0, cache.size());
}
});
}
/** */
private void doTxPutAll(Map<Integer, Person> values) throws Exception {
executeWithAllTxCaches(new TestClosure() {
@Override public void run() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
cache.putAll(values);
tx.commit();
}
assertEquals(0, cache.size());
return null;
}
}, CacheException.class, ERR_MSG);
}
});
}
/** */
private void executeWithAllCaches(TestClosure clo) throws Exception {
for (CacheConfiguration ccfg : cacheConfigurations())
executeForCache(ccfg, clo, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
}
/** */
private void executeWithAllTxCaches(final TestClosure clo) throws Exception {
for (CacheConfiguration ccfg : cacheConfigurations()) {
if (ccfg.getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
for (TransactionConcurrency con : TransactionConcurrency.values()) {
for (TransactionIsolation iso : TransactionIsolation.values())
executeForCache(ccfg, clo, con, iso);
}
}
}
}
/** */
private void executeForCache(CacheConfiguration ccfg, TestClosure clo, TransactionConcurrency concurrency,
TransactionIsolation isolation) throws Exception {
Ignite ignite = grid(NODE_CLIENT);
executeForNodeAndCache(ccfg, ignite, clo, concurrency, isolation);
for (int node = 0; node < NODE_COUNT; node++) {
ignite = grid(node);
executeForNodeAndCache(ccfg, ignite, clo, concurrency, isolation);
}
}
/** */
private void executeForNodeAndCache(CacheConfiguration ccfg, Ignite ignite, TestClosure clo,
TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
String cacheName = ccfg.getName();
IgniteCache cache;
if (ignite.configuration().isClientMode() &&
ccfg.getCacheMode() == CacheMode.PARTITIONED &&
ccfg.getNearConfiguration() != null)
cache = ignite.getOrCreateNearCache(ccfg.getName(), ccfg.getNearConfiguration());
else
cache = ignite.cache(ccfg.getName());
cache.removeAll();
assertEquals(0, cache.size());
clo.configure(ignite, cache, concurrency, isolation);
log.info("Running test with node " + ignite.name() + ", cache " + cacheName);
clo.key1 = 1;
clo.key2 = 4;
clo.run();
}
/** */
private List<List<?>> executeSql(String sqlText) throws Exception {
GridQueryProcessor qryProc = grid(NODE_CLIENT).context().query();
return qryProc.querySqlFields(new SqlFieldsQuery(sqlText), true).getAll();
}
/** */
private void cleanup() throws Exception {
for (CacheConfiguration ccfg : cacheConfigurations()) {
String cacheName = ccfg.getName();
if (ccfg.getCacheMode() == CacheMode.PARTITIONED && ccfg.getNearConfiguration() != null) {
IgniteCache cache = grid(NODE_CLIENT).getOrCreateNearCache(cacheName, ccfg.getNearConfiguration());
cache.clear();
}
grid(NODE_CLIENT).cache(cacheName).clear();
}
executeSql("DROP TABLE test IF EXISTS");
}
/** */
private void checkState(String schemaName, String tableName, String fieldName) {
IgniteEx client = grid(NODE_CLIENT);
checkNodeState(client, schemaName, tableName, fieldName);
for (int i = 0; i < NODE_COUNT; i++)
checkNodeState(grid(i), schemaName, tableName, fieldName);
}
/** */
private void checkNodeState(IgniteEx node, String schemaName, String tableName, String fieldName) {
String cacheName = F.eq(schemaName, QueryUtils.DFLT_SCHEMA) ?
QueryUtils.createTableCacheName(schemaName, tableName) : schemaName;
DynamicCacheDescriptor desc = node.context().cache().cacheDescriptor(cacheName);
assertNotNull("Cache descriptor not found", desc);
QuerySchema schema = desc.schema();
assertNotNull(schema);
QueryEntity entity = null;
for (QueryEntity e : schema.entities()) {
if (F.eq(tableName, e.getTableName())) {
entity = e;
break;
}
}
assertNotNull(entity);
assertNotNull(entity.getNotNullFields());
assertTrue(entity.getNotNullFields().contains(fieldName));
}
/** */
public static class Person {
/** */
@QuerySqlField(notNull = true)
private String name;
/** */
@QuerySqlField
private int age;
/** */
public Person(String name, int age) {
this.name = name;
this.age = age;
}
/** */
@Override public int hashCode() {
return (name == null ? 0 : name.hashCode()) ^ age;
}
/** */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof Person))
return false;
Person other = (Person)o;
return F.eq(other.name, name) && other.age == age;
}
}
/** */
public class TestEntryProcessor implements EntryProcessor<Integer, Person, Object> {
/** Value to set. */
private Person value;
/** */
public TestEntryProcessor(Person value) {
this.value = value;
}
/** {@inheritDoc} */
@Override public Object process(MutableEntry<Integer, Person> entry,
Object... objects) throws EntryProcessorException {
if (value == null)
entry.remove();
else
entry.setValue(value);
return null;
}
}
/** */
public abstract class TestClosure {
/** */
protected Ignite ignite;
/** */
protected IgniteCache<Integer, Person> cache;
/** */
protected TransactionConcurrency concurrency;
/** */
protected TransactionIsolation isolation;
/** */
public int key1;
/** */
public int key2;
/** */
public void configure(Ignite ignite, IgniteCache<Integer, Person> cache, TransactionConcurrency concurrency,
TransactionIsolation isolation) {
this.ignite = ignite;
this.cache = cache;
this.concurrency = concurrency;
this.isolation = isolation;
}
/** */
public abstract void run() throws Exception;
}
/**
* Test cache store stub.
*/
private static class TestStore extends CacheStoreAdapter<Integer, Person> {
/** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<Integer, Person> clo, @Nullable Object... args) {
// No-op
}
/** {@inheritDoc} */
@Override public Person load(Integer key) {
return null;
}
/** {@inheritDoc} */
@Override public void write(javax.cache.Cache.Entry<? extends Integer, ? extends Person> e) {
// No-op
}
/** {@inheritDoc} */
@Override public void delete(Object key) {
// No-op
}
}
/**
* Test interceptor stub.
*/
private static class TestInterceptor implements CacheInterceptor<Integer, Person> {
/** {@inheritDoc} */
@Nullable @Override public Person onGet(Integer key, @Nullable Person val) {
return null;
}
/** {@inheritDoc} */
@Nullable @Override public Person onBeforePut(Cache.Entry<Integer, Person> entry, Person newVal) {
return null;
}
/** {@inheritDoc} */
@Override public void onAfterPut(Cache.Entry<Integer, Person> entry) {
// No-op
}
/** {@inheritDoc} */
@Nullable @Override public IgniteBiTuple<Boolean, Person> onBeforeRemove(Cache.Entry<Integer, Person> entry) {
return null;
}
/** {@inheritDoc} */
@Override public void onAfterRemove(Cache.Entry<Integer, Person> entry) {
// No-op
}
}
}