blob: 52a1d54863dc69670cb04d0bf1c68cc73327ea43 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.integration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheStoreSessionResource;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
*
*/
public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends IgniteCacheAbstractTest {
/** */
private static final String CACHE_NAME1 = "cache1";
/** */
private static volatile CountDownLatch latch;
/** */
protected static volatile CountDownLatch entLatch;
/** */
private static volatile ExpectedData expData;
/** {@inheritDoc} */
@Override protected int gridCount() {
return 1;
}
/** {@inheritDoc} */
@Override protected CacheMode cacheMode() {
return CacheMode.PARTITIONED;
}
/** {@inheritDoc} */
@Override protected NearCacheConfiguration nearConfiguration() {
return null;
}
/**
* @param igniteInstanceName Ignite instance name.
* @return Cache configuration.
* @throws Exception In case of error.
*/
@Override @SuppressWarnings("unchecked")
protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
CacheConfiguration ccfg0 = super.cacheConfiguration(igniteInstanceName);
ccfg0.setReadThrough(true);
ccfg0.setWriteThrough(true);
ccfg0.setWriteBehindBatchSize(10);
ccfg0.setWriteBehindFlushSize(10);
ccfg0.setWriteBehindFlushFrequency(600);
ccfg0.setWriteBehindEnabled(true);
ccfg0.setCacheStoreFactory(singletonFactory(new TestStore()));
return ccfg0;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
assert cfg.getCacheConfiguration().length == 1;
CacheConfiguration ccfg0 = cacheConfiguration(igniteInstanceName);
ccfg0.setName(DEFAULT_CACHE_NAME);
CacheConfiguration ccfg1 = cacheConfiguration(igniteInstanceName);
ccfg1.setName(CACHE_NAME1);
cfg.setCacheConfiguration(ccfg0, ccfg1);
return cfg;
}
/**
* @throws Exception If failed.
*/
@Test
public void testSession() throws Exception {
testCache(DEFAULT_CACHE_NAME);
testCache(CACHE_NAME1);
}
/**
* @param cacheName Cache name.
* @throws Exception If failed.
*/
private void testCache(String cacheName) throws Exception {
IgniteCache<Integer, Integer> cache = ignite(0).cache(cacheName);
try {
latch = new CountDownLatch(2);
entLatch = new CountDownLatch(11);
expData = new ExpectedData("writeAll", cacheName);
for (int i = 0; i < 11; i++)
cache.put(i, i);
assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS));
assertTrue(entLatch.await(10_000, TimeUnit.MILLISECONDS));
}
finally {
latch = null;
entLatch = null;
}
try {
latch = new CountDownLatch(2);
entLatch = new CountDownLatch(11);
expData = new ExpectedData("deleteAll", cacheName);
for (int i = 0; i < 11; i++)
cache.remove(i);
assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS));
assertTrue(entLatch.await(10_000, TimeUnit.MILLISECONDS));
}
finally {
latch = null;
entLatch = null;
}
}
/**
*
*/
protected class TestStore implements CacheStore<Object, Object> {
/** Auto-injected store session. */
@CacheStoreSessionResource
private CacheStoreSession ses;
/** */
@IgniteInstanceResource
protected Ignite ignite;
/** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) {
fail();
}
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) throws CacheWriterException {
fail();
}
/** {@inheritDoc} */
@Override public Object load(Object key) throws CacheLoaderException {
fail();
return null;
}
/** {@inheritDoc} */
@Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException {
fail();
return null;
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException {
fail();
}
/** {@inheritDoc} */
@Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) throws CacheWriterException {
log.info("writeAll: " + entries);
assertTrue("Unexpected entries: " + entries, entries.size() == 10 || entries.size() == 1);
checkSession("writeAll");
for (int i = 0; i < entries.size(); i++)
entLatch.countDown();
}
/** {@inheritDoc} */
@Override public void delete(Object key) throws CacheWriterException {
fail();
}
/** {@inheritDoc} */
@Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
log.info("deleteAll: " + keys);
assertTrue("Unexpected keys: " + keys, keys.size() == 10 || keys.size() == 1);
checkSession("deleteAll");
for (int i = 0; i < keys.size(); i++)
entLatch.countDown();
}
/**
* @return Store session.
*/
private CacheStoreSession session() {
return ses;
}
/**
* @param mtd Called stored method.
*/
protected void checkSession(String mtd) {
assertNotNull(ignite);
CacheStoreSession ses = session();
assertNotNull(ses);
log.info("Cache: " + ses.cacheName());
assertFalse(ses.isWithinTransaction());
assertNull(ses.transaction());
assertNotNull(expData);
assertEquals(mtd, expData.expMtd);
assertEquals(expData.expCacheName, ses.cacheName());
assertNotNull(ses.properties());
ses.properties().put(1, "test");
assertEquals("test", ses.properties().get(1));
latch.countDown();
}
}
/**
*
*/
static class ExpectedData {
/** */
private final String expMtd;
/** */
private final String expCacheName;
/**
* @param expMtd Expected method.
* @param expCacheName Expected cache name.
*/
public ExpectedData(String expMtd, String expCacheName) {
this.expMtd = expMtd;
this.expCacheName = expCacheName;
}
}
}