blob: 753f8889d4394ffca25c005fd186fa9cd742e46c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.gatling
import javax.cache.processor.MutableEntry
import scala.collection.SortedMap
import scala.collection.SortedSet
import scala.language.postfixOps
import com.typesafe.scalalogging.StrictLogging
import io.gatling.core.Predef._
import io.gatling.core.session.ExpressionSuccessWrapper
import org.apache.ignite.cache.CacheEntryProcessor
import org.apache.ignite.gatling.InvokeAllTest.getBatch
import org.apache.ignite.gatling.InvokeAllTest.testCache1
import org.apache.ignite.gatling.InvokeAllTest.testCache2
import org.apache.ignite.gatling.Predef._
import org.apache.ignite.gatling.Predef.group
import org.apache.ignite.gatling.utils.AbstractGatlingTest
import org.apache.ignite.gatling.utils.IgniteClientApi.NodeApi
import org.apache.ignite.gatling.utils.IgniteSupport
import org.junit.Test
/**
* Tests invokeAll.
*/
class InvokeAllTest extends AbstractGatlingTest {
@Test
/** Tests simulation with the single entry processor passed to invokeAll. */
def testSingleProcessor(): Unit = runWith(NodeApi)(classOf[SingleProcessorSimulation].getName)
@Test
/** Tests simulation with multiple entry processors passed to invokeAll. */
def multipleProcessors(): Unit = runWith(NodeApi)(classOf[MultipleProcessorsSimulation].getName)
}
object InvokeAllTest {
/** First test cache */
val testCache1 = "TEST-CACHE-1"
/** Second test cache */
val testCache2 = "TEST-CACHE-2"
/** Helper to extract the fed batch from session */
val getBatch = "#{batch}"
}
/**
* invokeAll with single entry processor simulation.
*/
class SingleProcessorSimulation extends Simulation with IgniteSupport with StrictLogging {
private val scn = scenario("invokeAll")
.feed(new BatchFeeder())
.ignite(
getOrCreateCache(testCache1) backups 1 atomicity ATOMIC mode PARTITIONED as "create",
group("run outside of transaction")(
asyncOperationsWithCache(testCache1)
),
getOrCreateCache(testCache2) atomicity TRANSACTIONAL mode REPLICATED,
tx run (
syncOperationsWithCache(testCache2)
) as "run in transaction"
)
private def syncOperationsWithCache(cache: String) = ignite(
putAll[Int, Int](cache, getBatch),
invokeAll[Int, Int, Unit](cache, SortedSet(1, 3, 5).expressionSuccess) { e: MutableEntry[Int, Int] =>
e.setValue(-e.getValue)
},
invokeAll[Int, Int, Unit](cache, SortedSet(1, 5, 3).expressionSuccess).args(3.expressionSuccess) {
(e: MutableEntry[Int, Int], args: Seq[Any]) =>
e.setValue(e.getValue * args.head.asInstanceOf[Integer])
},
getAll[Int, Int](cache, SortedSet(1, 3, 5))
.check(
entries[Int, Int].findAll.validate((entries: Seq[Entry[Int, Int]], _: Session) =>
entries.forall(e => e.value == -3 * (e.key + 1))
)
) as "getAll"
)
private def asyncOperationsWithCache(cache: String) = ignite(
putAll[Int, Int](cache, getBatch) async,
invokeAll[Int, Int, Unit](cache, SortedSet(1, 3, 5).expressionSuccess) { e: MutableEntry[Int, Int] =>
e.setValue(-e.getValue)
} async,
invokeAll[Int, Int, Unit](cache, SortedSet(1, 5, 3).expressionSuccess).args(3.expressionSuccess) {
(e: MutableEntry[Int, Int], args: Seq[Any]) =>
e.setValue(e.getValue * args.head.asInstanceOf[Integer])
} async,
getAll[Int, Int](cache, SortedSet(1, 3, 5))
.check(
entries[Int, Int].findAll.validate((entries: Seq[Entry[Int, Int]], _: Session) =>
entries.forall(e => e.value == -3 * (e.key + 1))
)
)
.async as "getAll"
)
setUp(scn.inject(atOnceUsers(1)))
.protocols(protocol)
.assertions(
global.failedRequests.count.is(0)
)
}
/** Entry processor with arguments. */
private class EntryProcessor extends CacheEntryProcessor[Int, Int, Unit]() {
override def process(e: MutableEntry[Int, Int], args: Object*): Unit =
e.setValue(e.getValue * args.toList.head.asInstanceOf[Integer])
}
/**
* invokeAll with multiple entry processors simulation.
*/
class MultipleProcessorsSimulation extends Simulation with IgniteSupport with StrictLogging {
private val scn = scenario("invokeAll")
.feed(new BatchFeeder())
.ignite(
getOrCreateCache(testCache1) backups 1 atomicity ATOMIC mode PARTITIONED as "create",
group("run outside of transaction")(
asyncOperationsWithCache(testCache1)
),
getOrCreateCache(testCache2) atomicity TRANSACTIONAL mode REPLICATED,
tx run (
syncOperationsWithCache(testCache2)
) as "run in transaction"
)
private def syncOperationsWithCache(cache: String) = ignite(
putAll[Int, Int](cache, getBatch),
invokeAll[Int, Int, Unit](
cache,
map = SortedMap(
1 -> new EntryProcessor(),
3 -> new EntryProcessor()
)
).args(2.expressionSuccess),
getAll[Int, Int](cache, SortedSet(1, 3))
.check(
entries[Int, Int].findAll.validate((entries: Seq[Entry[Int, Int]], _: Session) =>
entries.forall(e => e.value == 2 * (e.key + 1))
)
)
)
private def asyncOperationsWithCache(cache: String) = ignite(
putAll[Int, Int](cache, getBatch) async,
invokeAll[Int, Int, Unit](
cache,
map = SortedMap(
1 -> new EntryProcessor(),
3 -> new EntryProcessor()
)
).args(2.expressionSuccess) async,
getAll[Int, Int](cache, SortedSet(1, 3))
.check(
entries[Int, Int].findAll.validate((entries: Seq[Entry[Int, Int]], _: Session) =>
entries.forall(e => e.value == 2 * (e.key + 1))
)
) async
)
setUp(scn.inject(atOnceUsers(1)))
.protocols(protocol)
.assertions(
global.failedRequests.count.is(0)
)
}