blob: e4ceefabb015cead9d22961f3146e2246d36fa46 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.gatling.api
import scala.util.Try
import com.typesafe.scalalogging.StrictLogging
import io.gatling.commons.validation.Validation
import io.gatling.core.session.Session
import org.apache.ignite.Ignition
import org.apache.ignite.binary.BinaryObjectBuilder
import org.apache.ignite.client.ClientCacheConfiguration
import org.apache.ignite.client.IgniteClient
import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.configuration.ClientConfiguration
import org.apache.ignite.configuration.IgniteConfiguration
import org.apache.ignite.gatling.api.node.IgniteNodeApi
import org.apache.ignite.gatling.api.thin.IgniteThinApi
import org.apache.ignite.gatling.builder.ignite.SimpleCacheConfiguration
import org.apache.ignite.gatling.protocol.IgniteClientCfg
import org.apache.ignite.gatling.protocol.IgniteClientConfigurationCfg
import org.apache.ignite.gatling.protocol.IgniteClientPool
import org.apache.ignite.gatling.protocol.IgniteClientPoolCfg
import org.apache.ignite.gatling.protocol.IgniteConfigurationCfg
import org.apache.ignite.gatling.protocol.IgniteNodeCfg
import org.apache.ignite.gatling.protocol.IgniteProtocol
import org.apache.ignite.gatling.protocol.IgniteProtocol.IgniteApiSessionKey
import org.apache.ignite.transactions.TransactionConcurrency
import org.apache.ignite.transactions.TransactionIsolation
/**
* Wrapper around the Ignite API abstracting access either via the Ignite Node (thick) API or via the
* Ignite Client (thin) API.
*/
trait IgniteApi {
/**
* Gets an instance of cache API by name.
*
* @tparam K Type of the cache key.
* @tparam V Type of the cache value.
* @return Instance of the CacheAPI.
*/
def cache[K, V]: String => Validation[CacheApi[K, V]]
/**
* Gets existing cache or creates new one with the given name and the simplified cache configuration
* which may be defined via the Ignite Gatling DSL.
*
* @tparam K Type of the cache key.
* @tparam V Type of the cache value.
* @param name Cache name.
* @param cfg Simplified configuration of the cache (see the SimpleCacheConfiguration).
* @param s Function to be called if operation is competed successfully.
* @param f Function to be called if exception occurs.
*/
def getOrCreateCacheBySimpleConfig[K, V](name: String, cfg: SimpleCacheConfiguration)(
s: CacheApi[K, V] => Unit,
f: Throwable => Unit
): Unit
/**
* Gets existing cache or creates new one with the provided Ignite (thin) Client cache configuration.
*
* @tparam K Type of the cache key.
* @tparam V Type of the cache value.
* @param cfg Instance of Ignite Client cache configuration.
* @param s Function to be called if operation is competed successfully.
* @param f Function to be called if exception occurs.
*/
def getOrCreateCacheByClientConfiguration[K, V](
cfg: ClientCacheConfiguration
)(s: CacheApi[K, V] => Unit, f: Throwable => Unit): Unit
/**
* Gets existing cache or creates new one with the provided Ignite (thick) cache configuration.
*
* @tparam K Type of the cache key.
* @tparam V Type of the cache value.
* @param cfg Instance of Ignite cache configuration.
* @param s Function to be called if operation is competed successfully.
* @param f Function to be called if exception occurs.
*/
def getOrCreateCacheByConfiguration[K, V](cfg: CacheConfiguration[K, V])(s: CacheApi[K, V] => Unit, f: Throwable => Unit): Unit
/**
* Closes the Ignite API.
*
* @param s Function to be called if operation is competed successfully.
* @param f Function to be called if exception occurs.
*/
def close()(s: Unit => Unit, f: Throwable => Unit): Unit
/**
* Starts the transaction with the default parameters.
*
* @param s Function to be called if operation is competed successfully.
* @param f Function to be called if exception occurs.
*/
def txStart()(s: TransactionApi => Unit, f: Throwable => Unit): Unit
/**
* Starts the transaction with the provided concurrency and isolation.
*
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param s Function to be called if operation is competed successfully.
* @param f Function to be called if exception occurs.
*/
def txStart(concurrency: TransactionConcurrency, isolation: TransactionIsolation)(
s: TransactionApi => Unit,
f: Throwable => Unit
): Unit
/**
* Starts the transaction with the provided concurrency, isolation timeout and transaction size.
*
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
* @param size Number of entries participating in transaction (may be approximate).
* @param s Function to be called if operation is competed successfully.
* @param f Function to be called if exception occurs.
*/
def txStart(concurrency: TransactionConcurrency, isolation: TransactionIsolation, timeout: Long, size: Int)(
s: TransactionApi => Unit,
f: Throwable => Unit
): Unit
/**
* Creates instance of BinaryObjectBuilder.
*
* @return Instance of BinaryObjectBuilder.
*/
def binaryObjectBuilder: String => BinaryObjectBuilder
/**
* Returns the wrapped Ignite API instance.
*
* @tparam I Class of the Ignite API.
* @return wrapped Ignite API instance.
*/
def wrapped[I]: I
}
/**
* Factory for IgniteApi instances.
*/
object IgniteApi extends StrictLogging {
/**
* Starts new instance of IgniteApi.
*
* Starts new Ignite node or thin client instances if protocol was configured via configuration objects.
* Do nothing and returns existing instances from session otherwise.
*
* @param protocol Gatling Ignite protocol.
* @param session Gatling session.
* @param s Function to be called if operation is competed successfully.
* @param f Function to be called if exception occurs.
*/
def start(protocol: IgniteProtocol, session: Session)(s: IgniteApi => Unit, f: Throwable => Unit): Unit =
protocol.cfg match {
case IgniteClientConfigurationCfg(cfg) if protocol.explicitClientStart => Try(startClient(cfg)).fold(f, s)
case IgniteConfigurationCfg(cfg) if protocol.explicitClientStart => Try(startNode(cfg)).fold(f, s)
case _ => s(session(IgniteApiSessionKey).as[IgniteApi])
}
/**
* Creates default instance of IgniteApi on simulation start depending on the protocol passed.
*
* If protocol contains configuration objects it starts either ignite node
* in client mode or thin ignite client.
*
* If protocol contains started instances of node or client just returns it.
*
* @param protocol Gatling Ignite protocol
* @return Instance of Ignite API
*/
def apply(protocol: IgniteProtocol): Option[IgniteApi] =
protocol.cfg match {
case IgniteClientConfigurationCfg(cfg) if !protocol.explicitClientStart => Some(startClient(cfg))
case IgniteConfigurationCfg(cfg) if !protocol.explicitClientStart => Some(startNode(cfg))
case IgniteNodeCfg(node) =>
node.cacheNames.forEach(name => node.cache(name))
Some(IgniteNodeApi(node))
case IgniteClientCfg(igniteClient) => Some(IgniteThinApi(new IgniteClientPreStartedPool(igniteClient)))
case IgniteClientPoolCfg(pool) => Some(IgniteThinApi(pool))
case _ => None
}
/**
* Starts Ignite node in client mode.
*
* @param cfg Node configuration.
* @return Instance of IgniteNodeApi.
*/
private def startNode(cfg: IgniteConfiguration): IgniteNodeApi = {
val node = Ignition.start(cfg)
node.cacheNames.forEach(name => node.cache(name))
IgniteNodeApi(node)
}
/**
* Starts Ignite (thin) client.
*
* @param cfg Client configuration.
* @return Instance of IgniteThinApi.
*/
private def startClient(cfg: ClientConfiguration): IgniteThinApi = IgniteThinApi(
new IgniteClientPreStartedPool(Ignition.startClient(cfg))
)
}
/**
* Pool enclosing the single pre-started IgniteClient instance.
*
* @param client Thin client instance.
*/
private class IgniteClientPreStartedPool(client: IgniteClient) extends IgniteClientPool {
override def apply(): IgniteClient = client
override def close(): Unit = {}
}