blob: 9cd355fe0f5eab0bd4e08c724e9cdb0653cc86f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Stack;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.query.h2.sql.AbstractH2CompareQueryTest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
*
*/
public class IgniteCacheCrossCacheJoinRandomTest extends AbstractH2CompareQueryTest {
/** */
private static final int OBJECTS = 200;
/** */
private static final int MAX_CACHES = 5;
/** */
private static Random rnd;
/** */
private static List<Map<Integer, Integer>> cachesData;
/** */
private static final List<T2<CacheMode, Integer>> MODES_1 = F.asList(
//new T2<>(REPLICATED, 0),
new T2<>(PARTITIONED, 0),
new T2<>(PARTITIONED, 1),
new T2<>(PARTITIONED, 2));
/** */
private static final List<T2<CacheMode, Integer>> MODES_2 = F.asList(
//new T2<>(REPLICATED, 0),
new T2<>(PARTITIONED, 0),
new T2<>(PARTITIONED, 1));
/** {@inheritDoc} */
@Override protected void createCaches() {
// No-op.
}
/** {@inheritDoc} */
@Override protected void initCacheAndDbData() throws Exception {
// No-op.
}
/** {@inheritDoc} */
@Override protected void checkAllDataEquals() throws Exception {
// No-op.
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 10 * 60_000;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startClientGrid(SRVS);
long seed = System.currentTimeMillis();
rnd = new Random(seed);
log.info("Random seed: " + seed);
cachesData = new ArrayList<>(MAX_CACHES);
for (int i = 0; i < MAX_CACHES; i++) {
Map<Integer, Integer> data = createData(OBJECTS * 2);
insertH2(data, i);
cachesData.add(data);
}
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
cachesData = null;
super.afterTestsStopped();
}
/** {@inheritDoc} */
@Override protected Statement initializeH2Schema() throws SQLException {
Statement st = super.initializeH2Schema();
for (int i = 0; i < MAX_CACHES; i++) {
st.execute("CREATE SCHEMA \"cache" + i + "\"");
st.execute("create table \"cache" + i + "\".TESTOBJECT" +
" (_key int not null," +
" _val other not null," +
" parentId int)");
}
return st;
}
/**
* @param name Cache name.
* @param cacheMode Cache mode.
* @param backups Number of backups.
* @return Cache configuration.
*/
private CacheConfiguration configuration(String name, CacheMode cacheMode, int backups) {
CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setName(name);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setAtomicityMode(ATOMIC);
ccfg.setCacheMode(cacheMode);
if (cacheMode == PARTITIONED)
ccfg.setBackups(backups);
QueryEntity entity = new QueryEntity();
entity.setKeyType(Integer.class.getName());
entity.setValueType(TestObject.class.getName());
entity.addQueryField("parentId", Integer.class.getName(), null);
entity.setIndexes(F.asList(new QueryIndex("parentId")));
ccfg.setQueryEntities(F.asList(entity));
return ccfg;
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoin2Caches() throws Exception {
testJoin(2, MODES_1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoin3Caches() throws Exception {
testJoin(3, MODES_1);
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoin4Caches() throws Exception {
testJoin(4, MODES_2);
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoin5Caches() throws Exception {
testJoin(5, MODES_2);
}
/**
* @param caches Number of caches.
* @param allModes Cache modes.
* @throws Exception If failed.
*/
private void testJoin(int caches, List<T2<CacheMode, Integer>> allModes) throws Exception {
checkJoin(cachesData, allModes, new Stack<T2<CacheMode, Integer>>(), caches);
}
/**
* @param cachesData Caches data.
* @param allModes Modes to test.
* @param modes Select modes.
* @param caches Caches number.
* @throws Exception If failed.
*/
private void checkJoin(List<Map<Integer, Integer>> cachesData,
List<T2<CacheMode, Integer>> allModes,
Stack<T2<CacheMode, Integer>> modes,
int caches) throws Exception {
if (modes.size() == caches) {
List<CacheConfiguration> ccfgs = new ArrayList<>();
for (int i = 0; i < modes.size(); i++) {
T2<CacheMode, Integer> mode = modes.get(i);
CacheConfiguration ccfg = configuration("cache" + i, mode.get1(), mode.get2());
ccfgs.add(ccfg);
}
log.info("Check configurations: " + modes);
checkJoinQueries(ccfgs, cachesData);
}
else {
for (T2<CacheMode, Integer> mode : allModes) {
modes.push(mode);
checkJoin(cachesData, allModes, modes, caches);
modes.pop();
}
}
}
/**
* @param ccfgs Configurations.
* @param cachesData Caches data.
* @throws Exception If failed.
*/
private void checkJoinQueries(List<CacheConfiguration> ccfgs, List<Map<Integer, Integer>> cachesData) throws Exception {
Ignite client = ignite(SRVS);
final int CACHES = ccfgs.size();
try {
IgniteCache cache = null;
boolean hasReplicated = false;
for (int i = 0; i < CACHES; i++) {
CacheConfiguration ccfg = ccfgs.get(i);
IgniteCache cache0 = client.createCache(ccfg);
if (ccfg.getCacheMode() == REPLICATED)
hasReplicated = true;
if (cache == null && ccfg.getCacheMode() == PARTITIONED)
cache = cache0;
insertCache(cachesData.get(i), cache0);
}
boolean distributedJoin = true;
// Do not use distributed join if all caches are REPLICATED.
if (cache == null) {
cache = client.cache(ccfgs.get(0).getName());
distributedJoin = false;
}
Object[] args = {};
compareQueryRes0(cache, createQuery(CACHES, false, null), distributedJoin, false, args, Ordering.RANDOM);
if (!hasReplicated) {
compareQueryRes0(cache, createQuery(CACHES, false, null), distributedJoin, true, args, Ordering.RANDOM);
compareQueryRes0(cache, createQuery(CACHES, true, null), distributedJoin, true, args, Ordering.RANDOM);
}
Map<Integer, Integer> data = cachesData.get(CACHES - 1);
final int QRY_CNT = CACHES > 4 ? 2 : 50;
int cnt = 0;
for (Integer objId : data.keySet()) {
compareQueryRes0(cache, createQuery(CACHES, false, objId), distributedJoin, false, args, Ordering.RANDOM);
if (!hasReplicated) {
compareQueryRes0(cache, createQuery(CACHES, false, objId), distributedJoin, true, args, Ordering.RANDOM);
compareQueryRes0(cache, createQuery(CACHES, true, objId), distributedJoin, true, args, Ordering.RANDOM);
}
if (cnt++ == QRY_CNT)
break;
}
}
finally {
for (CacheConfiguration ccfg : ccfgs)
client.destroyCache(ccfg.getName());
}
}
/**
* @param caches Number of caches to join.
* @param outer If {@code true} creates outer join query, otherwise inner join.
* @param objId Object ID.
* @return SQL.
*/
@SuppressWarnings("StringConcatenationInsideStringBufferAppend")
private String createQuery(int caches, boolean outer, @Nullable Integer objId) {
StringBuilder qry = new StringBuilder("select ");
for (int i = 0; i < caches; i++) {
if (i != 0)
qry.append(", ");
qry.append("o" + i + "._key");
}
qry.append(" from \"cache0\".TestObject o0 ");
for (int i = 1; i < caches; i++) {
String cacheName = "cache" + i;
String cur = "o" + i;
String prev = "o" + (i - 1);
qry.append(outer ? "left outer join " : "inner join ");
qry.append("\"" + cacheName + "\".TestObject " + cur);
if (i == caches - 1 && objId != null)
qry.append(" on (" + prev + ".parentId=" + cur + "._key and " + cur + "._key=" + objId + ") ");
else
qry.append(" on (" + prev + ".parentId=" + cur + "._key) ");
}
return qry.toString();
}
/**
* @param data Data.
* @param cache Cache.
*/
private void insertCache(Map<Integer, Integer> data, IgniteCache<Object, Object> cache) {
for (Map.Entry<Integer, Integer> e : data.entrySet())
cache.put(e.getKey(), new TestObject(e.getValue()));
}
/**
* @param data Data.
* @param cache Cache index.
* @throws Exception If failed.
*/
private void insertH2(Map<Integer, Integer> data, int cache) throws Exception {
for (Map.Entry<Integer, Integer> e : data.entrySet()) {
try (PreparedStatement st = conn.prepareStatement("insert into \"cache" + cache + "\".TESTOBJECT " +
"(_key, _val, parentId) values(?, ?, ?)")) {
st.setObject(1, e.getKey());
st.setObject(2, new TestObject(e.getValue()));
st.setObject(3, e.getValue());
st.executeUpdate();
}
}
}
/**
* @param cnt Objects count.
* @return Generated data.
*/
private Map<Integer, Integer> createData(int cnt) {
Map<Integer, Integer> res = new LinkedHashMap<>();
while (res.size() < cnt)
res.put(rnd.nextInt(cnt), rnd.nextInt(OBJECTS + 1));
return res;
}
/**
*
*/
static class TestObject implements Serializable {
/** */
int parentId;
/**
* @param parentId Parent object ID.
*/
public TestObject(int parentId) {
this.parentId = parentId;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TestObject.class, this);
}
}
}