| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ignite.gatling.api |
| |
| import scala.util.Try |
| |
| import com.typesafe.scalalogging.StrictLogging |
| import io.gatling.commons.validation.Validation |
| import io.gatling.core.session.Session |
| import org.apache.ignite.Ignition |
| import org.apache.ignite.binary.BinaryObjectBuilder |
| import org.apache.ignite.client.ClientCacheConfiguration |
| import org.apache.ignite.client.IgniteClient |
| import org.apache.ignite.configuration.CacheConfiguration |
| import org.apache.ignite.configuration.ClientConfiguration |
| import org.apache.ignite.configuration.IgniteConfiguration |
| import org.apache.ignite.gatling.api.node.IgniteNodeApi |
| import org.apache.ignite.gatling.api.thin.IgniteThinApi |
| import org.apache.ignite.gatling.builder.ignite.SimpleCacheConfiguration |
| import org.apache.ignite.gatling.protocol.IgniteClientCfg |
| import org.apache.ignite.gatling.protocol.IgniteClientConfigurationCfg |
| import org.apache.ignite.gatling.protocol.IgniteClientPool |
| import org.apache.ignite.gatling.protocol.IgniteClientPoolCfg |
| import org.apache.ignite.gatling.protocol.IgniteConfigurationCfg |
| import org.apache.ignite.gatling.protocol.IgniteNodeCfg |
| import org.apache.ignite.gatling.protocol.IgniteProtocol |
| import org.apache.ignite.gatling.protocol.IgniteProtocol.IgniteApiSessionKey |
| import org.apache.ignite.transactions.TransactionConcurrency |
| import org.apache.ignite.transactions.TransactionIsolation |
| |
| /** |
| * Wrapper around the Ignite API abstracting access either via the Ignite Node (thick) API or via the |
| * Ignite Client (thin) API. |
| */ |
| trait IgniteApi { |
| /** |
| * Gets an instance of cache API by name. |
| * |
| * @tparam K Type of the cache key. |
| * @tparam V Type of the cache value. |
| * @return Instance of the CacheAPI. |
| */ |
| def cache[K, V]: String => Validation[CacheApi[K, V]] |
| |
| /** |
| * Gets existing cache or creates new one with the given name and the simplified cache configuration |
| * which may be defined via the Ignite Gatling DSL. |
| * |
| * @tparam K Type of the cache key. |
| * @tparam V Type of the cache value. |
| * @param name Cache name. |
| * @param cfg Simplified configuration of the cache (see the SimpleCacheConfiguration). |
| * @param s Function to be called if operation is competed successfully. |
| * @param f Function to be called if exception occurs. |
| */ |
| def getOrCreateCacheBySimpleConfig[K, V](name: String, cfg: SimpleCacheConfiguration)( |
| s: CacheApi[K, V] => Unit, |
| f: Throwable => Unit |
| ): Unit |
| |
| /** |
| * Gets existing cache or creates new one with the provided Ignite (thin) Client cache configuration. |
| * |
| * @tparam K Type of the cache key. |
| * @tparam V Type of the cache value. |
| * @param cfg Instance of Ignite Client cache configuration. |
| * @param s Function to be called if operation is competed successfully. |
| * @param f Function to be called if exception occurs. |
| */ |
| def getOrCreateCacheByClientConfiguration[K, V]( |
| cfg: ClientCacheConfiguration |
| )(s: CacheApi[K, V] => Unit, f: Throwable => Unit): Unit |
| |
| /** |
| * Gets existing cache or creates new one with the provided Ignite (thick) cache configuration. |
| * |
| * @tparam K Type of the cache key. |
| * @tparam V Type of the cache value. |
| * @param cfg Instance of Ignite cache configuration. |
| * @param s Function to be called if operation is competed successfully. |
| * @param f Function to be called if exception occurs. |
| */ |
| def getOrCreateCacheByConfiguration[K, V](cfg: CacheConfiguration[K, V])(s: CacheApi[K, V] => Unit, f: Throwable => Unit): Unit |
| |
| /** |
| * Closes the Ignite API. |
| * |
| * @param s Function to be called if operation is competed successfully. |
| * @param f Function to be called if exception occurs. |
| */ |
| def close()(s: Unit => Unit, f: Throwable => Unit): Unit |
| |
| /** |
| * Starts the transaction with the default parameters. |
| * |
| * @param s Function to be called if operation is competed successfully. |
| * @param f Function to be called if exception occurs. |
| */ |
| def txStart()(s: TransactionApi => Unit, f: Throwable => Unit): Unit |
| |
| /** |
| * Starts the transaction with the provided concurrency and isolation. |
| * |
| * @param concurrency Concurrency. |
| * @param isolation Isolation. |
| * @param s Function to be called if operation is competed successfully. |
| * @param f Function to be called if exception occurs. |
| */ |
| def txStart(concurrency: TransactionConcurrency, isolation: TransactionIsolation)( |
| s: TransactionApi => Unit, |
| f: Throwable => Unit |
| ): Unit |
| |
| /** |
| * Starts the transaction with the provided concurrency, isolation timeout and transaction size. |
| * |
| * @param concurrency Concurrency. |
| * @param isolation Isolation. |
| * @param timeout Timeout. |
| * @param size Number of entries participating in transaction (may be approximate). |
| * @param s Function to be called if operation is competed successfully. |
| * @param f Function to be called if exception occurs. |
| */ |
| def txStart(concurrency: TransactionConcurrency, isolation: TransactionIsolation, timeout: Long, size: Int)( |
| s: TransactionApi => Unit, |
| f: Throwable => Unit |
| ): Unit |
| |
| /** |
| * Creates instance of BinaryObjectBuilder. |
| * |
| * @return Instance of BinaryObjectBuilder. |
| */ |
| def binaryObjectBuilder: String => BinaryObjectBuilder |
| |
| /** |
| * Returns the wrapped Ignite API instance. |
| * |
| * @tparam I Class of the Ignite API. |
| * @return wrapped Ignite API instance. |
| */ |
| def wrapped[I]: I |
| } |
| |
| /** |
| * Factory for IgniteApi instances. |
| */ |
| object IgniteApi extends StrictLogging { |
| /** |
| * Starts new instance of IgniteApi. |
| * |
| * Starts new Ignite node or thin client instances if protocol was configured via configuration objects. |
| * Do nothing and returns existing instances from session otherwise. |
| * |
| * @param protocol Gatling Ignite protocol. |
| * @param session Gatling session. |
| * @param s Function to be called if operation is competed successfully. |
| * @param f Function to be called if exception occurs. |
| */ |
| def start(protocol: IgniteProtocol, session: Session)(s: IgniteApi => Unit, f: Throwable => Unit): Unit = |
| protocol.cfg match { |
| case IgniteClientConfigurationCfg(cfg) if protocol.explicitClientStart => Try(startClient(cfg)).fold(f, s) |
| |
| case IgniteConfigurationCfg(cfg) if protocol.explicitClientStart => Try(startNode(cfg)).fold(f, s) |
| |
| case _ => s(session(IgniteApiSessionKey).as[IgniteApi]) |
| } |
| |
| /** |
| * Creates default instance of IgniteApi on simulation start depending on the protocol passed. |
| * |
| * If protocol contains configuration objects it starts either ignite node |
| * in client mode or thin ignite client. |
| * |
| * If protocol contains started instances of node or client just returns it. |
| * |
| * @param protocol Gatling Ignite protocol |
| * @return Instance of Ignite API |
| */ |
| def apply(protocol: IgniteProtocol): Option[IgniteApi] = |
| protocol.cfg match { |
| case IgniteClientConfigurationCfg(cfg) if !protocol.explicitClientStart => Some(startClient(cfg)) |
| |
| case IgniteConfigurationCfg(cfg) if !protocol.explicitClientStart => Some(startNode(cfg)) |
| |
| case IgniteNodeCfg(node) => |
| node.cacheNames.forEach(name => node.cache(name)) |
| Some(IgniteNodeApi(node)) |
| |
| case IgniteClientCfg(igniteClient) => Some(IgniteThinApi(new IgniteClientPreStartedPool(igniteClient))) |
| |
| case IgniteClientPoolCfg(pool) => Some(IgniteThinApi(pool)) |
| |
| case _ => None |
| } |
| |
| /** |
| * Starts Ignite node in client mode. |
| * |
| * @param cfg Node configuration. |
| * @return Instance of IgniteNodeApi. |
| */ |
| private def startNode(cfg: IgniteConfiguration): IgniteNodeApi = { |
| val node = Ignition.start(cfg) |
| node.cacheNames.forEach(name => node.cache(name)) |
| IgniteNodeApi(node) |
| } |
| |
| /** |
| * Starts Ignite (thin) client. |
| * |
| * @param cfg Client configuration. |
| * @return Instance of IgniteThinApi. |
| */ |
| private def startClient(cfg: ClientConfiguration): IgniteThinApi = IgniteThinApi( |
| new IgniteClientPreStartedPool(Ignition.startClient(cfg)) |
| ) |
| } |
| |
| /** |
| * Pool enclosing the single pre-started IgniteClient instance. |
| * |
| * @param client Thin client instance. |
| */ |
| private class IgniteClientPreStartedPool(client: IgniteClient) extends IgniteClientPool { |
| override def apply(): IgniteClient = client |
| |
| override def close(): Unit = {} |
| } |