blob: 2a62f15854ae1588e0863d44b7fb281b47b223ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.database;
import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
/**
*
*/
@WithSystemProperty(key = IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, value = "true")
public class IgnitePersistentStoreSchemaLoadTest extends GridCommonAbstractTest {
/** Cache name. */
private static final String TMPL_NAME = "test_cache*";
/** Table name. */
private static final String TBL_NAME = Person.class.getSimpleName();
/** Name of the cache created with {@code CREATE TABLE}. */
private static final String SQL_CACHE_NAME = QueryUtils.createTableCacheName(QueryUtils.DFLT_SCHEMA, TBL_NAME);
/** Name of the cache created upon cluster start. */
private static final String STATIC_CACHE_NAME = TBL_NAME;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
cfg.setCacheConfiguration(cacheCfg(TMPL_NAME));
DataStorageConfiguration pCfg = new DataStorageConfiguration();
pCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(true)
.setMaxSize(100L * 1024 * 1024));
pCfg.setCheckpointFrequency(1000);
cfg.setDataStorageConfiguration(pCfg);
return cfg;
}
/**
* Create node configuration with a cache pre-configured.
* @param gridName Node name.
* @return Node configuration with a cache pre-configured.
* @throws Exception if failed.
*/
@SuppressWarnings("unchecked")
private IgniteConfiguration getConfigurationWithStaticCache(String gridName) throws Exception {
IgniteConfiguration cfg = getConfiguration(gridName);
CacheConfiguration ccfg = cacheCfg(STATIC_CACHE_NAME);
ccfg.setIndexedTypes(Integer.class, Person.class);
ccfg.setSqlEscapeAll(true);
cfg.setCacheConfiguration(ccfg);
return optimize(cfg);
}
/** */
private CacheConfiguration cacheCfg(String name) {
CacheConfiguration<?, ?> cfg = new CacheConfiguration<>();
cfg.setName(name);
cfg.setRebalanceMode(CacheRebalanceMode.NONE);
cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/** */
@Test
public void testDynamicSchemaChangesPersistence() throws Exception {
checkSchemaStateAfterNodeRestart(false);
}
/** */
@Test
public void testDynamicSchemaChangesPersistenceWithAliveCluster() throws Exception {
checkSchemaStateAfterNodeRestart(true);
}
/** */
@Test
public void testDynamicSchemaChangesPersistenceWithStaticCache() throws Exception {
IgniteEx node = startGrid(getConfigurationWithStaticCache(getTestIgniteInstanceName(0)));
node.active(true);
IgniteCache cache = node.cache(STATIC_CACHE_NAME);
assertNotNull(cache);
CountDownLatch cnt = checkpointLatch(node);
assertEquals(0, indexCnt(node, STATIC_CACHE_NAME));
makeDynamicSchemaChanges(node, STATIC_CACHE_NAME);
checkDynamicSchemaChanges(node, STATIC_CACHE_NAME);
cnt.await();
stopGrid(0);
// Restarting with no-cache configuration - otherwise stored configurations
// will be ignored due to cache names duplication.
node = startGrid(0);
node.active(true);
checkDynamicSchemaChanges(node, STATIC_CACHE_NAME);
}
/**
* Perform test with cache created with {@code CREATE TABLE}.
* @param aliveCluster Whether there should remain an alive node when tested node is restarted.
* @throws Exception if failed.
*/
private void checkSchemaStateAfterNodeRestart(boolean aliveCluster) throws Exception {
IgniteEx node = startGrid(0);
node.active(true);
if (aliveCluster)
startGrid(1);
CountDownLatch cnt = checkpointLatch(node);
node.context().query().querySqlFields(
new SqlFieldsQuery("create table \"Person\" (\"id\" int primary key, \"name\" varchar)"), false);
assertEquals(0, indexCnt(node, SQL_CACHE_NAME));
makeDynamicSchemaChanges(node, QueryUtils.DFLT_SCHEMA);
checkDynamicSchemaChanges(node, SQL_CACHE_NAME);
cnt.await();
stopGrid(0);
node = startGrid(0);
node.active(true);
checkDynamicSchemaChanges(node, SQL_CACHE_NAME);
node.context().query().querySqlFields(new SqlFieldsQuery("drop table \"Person\""), false).getAll();
}
/** */
private int indexCnt(IgniteEx node, String cacheName) {
DynamicCacheDescriptor desc = node.context().cache().cacheDescriptor(cacheName);
int cnt = 0;
if (desc != null) {
QuerySchema schema = desc.schema();
if (schema != null) {
for (QueryEntity entity : schema.entities())
cnt += entity.getIndexes().size();
}
}
return cnt;
}
/** */
private int colsCnt(IgniteEx node, String cacheName) {
DynamicCacheDescriptor desc = node.context().cache().cacheDescriptor(cacheName);
int cnt = 0;
if (desc != null) {
QuerySchema schema = desc.schema();
if (schema != null) {
for (QueryEntity entity : schema.entities())
cnt += entity.getFields().size();
}
}
return cnt;
}
/**
* @param node Node whose checkpoint to wait for.
* @return Latch released when checkpoint happens.
*/
private CountDownLatch checkpointLatch(IgniteEx node) {
final CountDownLatch cnt = new CountDownLatch(1);
GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)node.context().cache().context().database();
db.addCheckpointListener(new DbCheckpointListener() {
@Override public void onMarkCheckpointBegin(Context ctx) {
cnt.countDown();
}
@Override public void beforeCheckpointBegin(Context ctx) throws IgniteCheckedException {
}
@Override public void onCheckpointBegin(Context ctx) {
/* No-op. */
}
});
return cnt;
}
/**
* Create dynamic index and column.
* @param node Node.
* @param schema Schema name.
*/
private void makeDynamicSchemaChanges(IgniteEx node, String schema) {
node.context().query().querySqlFields(
new SqlFieldsQuery("create index \"my_idx\" on \"Person\" (\"id\", \"name\")").setSchema(schema), false)
.getAll();
node.context().query().querySqlFields(
new SqlFieldsQuery("alter table \"Person\" add column (\"age\" int, \"city\" char)")
.setSchema(schema), false).getAll();
node.context().query().querySqlFields(
new SqlFieldsQuery("alter table \"Person\" drop column \"city\"").setSchema(schema), false)
.getAll();
}
/**
* Check that dynamically created schema objects are in place.
* @param node Node.
* @param cacheName Cache name.
*/
private void checkDynamicSchemaChanges(IgniteEx node, String cacheName) {
assertEquals(1, indexCnt(node, cacheName));
assertEquals(3, colsCnt(node, cacheName));
}
/**
*
*/
protected static class Person implements Serializable {
/** */
private static final long serialVersionUID = 0L;
/** */
@SuppressWarnings("unused")
private Person() {
// No-op.
}
/** */
public Person(int id) {
this.id = id;
}
/** */
@QuerySqlField
protected int id;
/** */
@QuerySqlField
protected String name;
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
IgnitePersistentStoreSchemaLoadTest.Person person = (IgnitePersistentStoreSchemaLoadTest.Person) o;
return id == person.id && (name != null ? name.equals(person.name) : person.name == null);
}
/** {@inheritDoc} */
@Override public int hashCode() {
int res = id;
res = 31 * res + (name != null ? name.hashCode() : 0);
return res;
}
}
}