blob: b80e1fa51d5b5163b2fba54919e0e8acf365de42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.store;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Random;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
/**
* This class provides non write coalescing tests for {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}.
*/
public class IgnteCacheClientWriteBehindStoreNonCoalescingTest extends IgniteCacheAbstractTest {
/** {@inheritDoc} */
@Override protected int gridCount() {
return 1;
}
/** {@inheritDoc} */
@Override protected CacheMode cacheMode() {
return null;
}
/** {@inheritDoc} */
@Override protected CacheAtomicityMode atomicityMode() {
return ATOMIC;
}
/** {@inheritDoc} */
@Override protected NearCacheConfiguration nearConfiguration() {
return null;
}
/** {@inheritDoc} */
@Override protected Factory<CacheStore> cacheStoreFactory() {
return new TestIncrementStoreFactory();
}
/** {@inheritDoc} */
@Override protected boolean writeBehindEnabled() {
return true;
}
/** {@inheritDoc} */
@Override protected boolean writeBehindCoalescing() {
return false;
}
/** */
private static Random rnd = new Random();
/**
* @throws Exception If failed.
*/
@Test
public void testNonCoalescingIncrementing() throws Exception {
Ignite ignite = grid(0);
IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
assertEquals(cache.getConfiguration(CacheConfiguration.class).getCacheStoreFactory().getClass(),
TestIncrementStoreFactory.class);
for (int i = 0; i < CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_SIZE * 2; i++) {
cache.put(i, i);
}
Collection<IgniteFuture<?>> futs = new ArrayList<>();
for (int i = 0; i < 1000; i++)
futs.add(updateKey(cache));
for (IgniteFuture<?> fut : futs)
fut.get();
}
/**
* Update random key in async mode.
*
* @param cache Cache to use.
* @return IgniteFuture.
*/
private IgniteFuture<?> updateKey(IgniteCache<Integer, Integer> cache) {
IgniteCache asyncCache = cache.withAsync();
// Using EntryProcessor.invokeAll to increment every value in place.
asyncCache.invoke(rnd.nextInt(100), new EntryProcessor<Integer, Integer, Object>() {
@Override public Object process(MutableEntry<Integer, Integer> entry, Object... arguments)
throws EntryProcessorException {
entry.setValue(entry.getValue() + 1);
return null;
}
});
return asyncCache.future();
}
/**
* Test increment store factory.
*/
public static class TestIncrementStoreFactory implements Factory<CacheStore> {
/** {@inheritDoc} */
@Override public CacheStore create() {
return new TestIncrementStore();
}
}
/**
* Test cache store to validate int value incrementing
*/
public static class TestIncrementStore extends CacheStoreAdapter<Object, Object> {
/** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) {
for (Map.Entry<Object, Object> e : storeMap.entrySet())
clo.apply(e.getKey(), e.getValue());
}
/** {@inheritDoc} */
@Override public Object load(Object key) {
return storeMap.get(key);
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry<? extends Object, ? extends Object> entry) {
Object oldVal = storeMap.put(entry.getKey(), entry.getValue());
if (oldVal != null) {
Integer oldInt = (Integer)oldVal;
Integer newInt = (Integer)entry.getValue();
assertTrue(
"newValue(" + newInt + ") != oldValue(" + oldInt + ")+1 !",
newInt == oldInt + 1);
}
}
/** {@inheritDoc} */
@Override public void delete(Object key) {
storeMap.remove(key);
}
}
}