blob: eb3c0677563a2b692a891c67a5f68a09a35217ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheStoreSessionResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
/**
* Test store.
*/
public final class GridCacheTestStore implements CacheStore<Integer, String> {
/** */
@CacheStoreSessionResource
private CacheStoreSession ses;
/** Store. */
private final Map<Integer, String> map;
/** Transactions. */
private final Collection<Transaction> txs = new GridConcurrentHashSet<>();
/** Last method called. */
private String lastMtd;
/** */
private long ts = System.currentTimeMillis();
/** {@link #load(Object)} method call counter .*/
private AtomicInteger loadCnt = new AtomicInteger();
/** {@link #write(Cache.Entry)} method call counter .*/
private AtomicInteger putCnt = new AtomicInteger();
/** {@link #writeAll(Collection)} method call counter .*/
private AtomicInteger putAllCnt = new AtomicInteger();
/** Flag indicating if methods of this store should fail. */
private volatile boolean shouldFail;
/** Configurable delay to simulate slow storage. */
private int operationDelay;
/**
* @param map Underlying store map.
*/
public GridCacheTestStore(Map<Integer, String> map) {
this.map = map;
}
/**
* Default constructor.
*/
public GridCacheTestStore() {
map = new ConcurrentHashMap<>();
}
/**
* @return Underlying map.
*/
public Map<Integer, String> getMap() {
return Collections.unmodifiableMap(map);
}
/**
* Sets a flag indicating if methods of this class should fail with {@link IgniteCheckedException}.
*
* @param shouldFail {@code true} if should fail.
*/
public void setShouldFail(boolean shouldFail) {
this.shouldFail = shouldFail;
}
/**
* Sets delay that this store should wait on each operation.
*
* @param operationDelay If zero, no delay applied, positive value means
* delay in milliseconds.
*/
public void setOperationDelay(int operationDelay) {
assert operationDelay >= 0;
this.operationDelay = operationDelay;
}
/**
* @return Transactions.
*/
public Collection<Transaction> transactions() {
return txs;
}
/**
*
* @return Last method called.
*/
public String getLastMethod() {
return lastMtd;
}
/**
* @return Last timestamp.
*/
public long getTimestamp() {
return ts;
}
/**
* @return Integer timestamp.
*/
public int getStart() {
return Math.abs((int)ts);
}
/**
* Sets last method to <tt>null</tt>.
*/
public void resetLastMethod() {
lastMtd = null;
}
/**
* Resets timestamp.
*/
public void resetTimestamp() {
ts = System.currentTimeMillis();
}
/**
* Resets the store to initial state.
*/
public void reset() {
lastMtd = null;
map.clear();
loadCnt.set(0);
putCnt.set(0);
putAllCnt.set(0);
ts = System.currentTimeMillis();
txs.clear();
}
/**
* @return Count of {@link #load(Object)} method calls since last reset.
*/
public int getLoadCount() {
return loadCnt.get();
}
/**
* @return Count of {@link #write(Cache.Entry)} method calls since last reset.
*/
public int getPutCount() {
return putCnt.get();
}
/**
* @return Count of {@link #writeAll(Collection)} method calls since last reset.
*/
public int getPutAllCount() {
return putAllCnt.get();
}
/** {@inheritDoc} */
@Override public String load(Integer key) {
checkTx(session(), true);
lastMtd = "load";
checkOperation();
loadCnt.incrementAndGet();
return map.get(key);
}
/** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<Integer, String> clo, Object[] args) {
lastMtd = "loadAllFull";
checkOperation();
int start = getStart();
int cnt = (Integer)args[0];
for (int i = start; i < start + cnt; i++) {
map.put(i, Integer.toString(i));
clo.apply(i, Integer.toString(i));
}
}
/** {@inheritDoc} */
@Override public Map<Integer, String> loadAll(Iterable<? extends Integer> keys) {
checkTx(session(), true);
lastMtd = "loadAll";
checkOperation();
Map<Integer, String> loaded = new HashMap<>();
for (Integer key : keys) {
String val = map.get(key);
if (val != null)
loaded.put(key, val);
}
return loaded;
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry<? extends Integer, ? extends String> e)
throws CacheWriterException {
checkTx(session(), false);
lastMtd = "put";
checkOperation();
map.put(e.getKey(), e.getValue());
putCnt.incrementAndGet();
}
/** {@inheritDoc} */
@Override public void writeAll(Collection<Cache.Entry<? extends Integer, ? extends String>> entries)
throws CacheWriterException {
checkTx(session(), false);
lastMtd = "putAll";
checkOperation();
for (Cache.Entry<? extends Integer, ? extends String> e : entries)
this.map.put(e.getKey(), e.getValue());
putAllCnt.incrementAndGet();
}
/** {@inheritDoc} */
@Override public void delete(Object key) throws CacheWriterException {
checkTx(session(), false);
lastMtd = "remove";
checkOperation();
map.remove(key);
}
/** {@inheritDoc} */
@Override public void deleteAll(Collection<?> keys)
throws CacheWriterException {
checkTx(session(), false);
lastMtd = "removeAll";
checkOperation();
for (Object key : keys)
map.remove(key);
}
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) {
// No-op.
}
/**
* Checks the flag and throws exception if it is set. Checks operation delay and sleeps
* for specified amount of time, if needed.
*/
private void checkOperation() {
if (shouldFail)
throw new IgniteException("Store exception.");
if (operationDelay > 0) {
try {
U.sleep(operationDelay);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteException(e);
}
}
}
/**
* @param ses Session.
* @param load {@code True} if {@link #loadAll} method is called.
*/
private void checkTx(@Nullable CacheStoreSession ses, boolean load) {
Transaction tx = ses != null ? ses.transaction() : null;
if (tx == null)
return;
txs.add(tx);
IgniteInternalTx tx0 = GridTestUtils.getFieldValue(tx, "tx");
if (!tx0.local())
throw new IgniteException("Tx is not local: " + tx);
if (tx0.dht() && !load)
throw new IgniteException("Tx is DHT: " + tx);
}
/**
* @return Store session.
*/
private CacheStoreSession session() {
return ses;
}
}