blob: 3daf0feb78915275b2905fc00ae7c633323557ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nlpcraft.server.tx
import java.sql.Connection
import io.opencensus.trace.Span
import org.apache.ignite.IgniteTransactions
import org.apache.ignite.lang.IgniteUuid
import org.apache.ignite.transactions.{Transaction, TransactionConcurrency, TransactionIsolation}
import org.apache.nlpcraft.server.ignite.NCIgniteInstance
import org.apache.nlpcraft.common._
import org.apache.nlpcraft.common.NCService
import scala.collection.mutable
import scala.util.control.Exception.catching
/**
* Transaction manager based on Ignite transaction management. It manages both Ignite cache
* and JDBC operations, and allows for multi-threaded transactions.
*/
object NCTxManager extends NCService with NCIgniteInstance {
// Internal log switch.
private final val LOG_TX = false
@volatile private var cons: mutable.Map[IgniteUuid, Connection] = _
/**
*
* @return
*/
private var itx: IgniteTransactions = _
/**
*
* @param parent Optional parent span.
*/
override def stop(parent: Span = null): Unit = startScopedSpan("stop", parent) { _ ⇒
ackStopping()
// Close all still attached JDBC connections on stop.
if (cons != null)
cons.values.foreach(U.close)
ackStopped()
}
/**
*
* @param parent Optional parent span.
* @return
*/
override def start(parent: Span = null): NCService = startScopedSpan("start", parent) { _ ⇒
ackStarting()
cons = mutable.HashMap.empty[IgniteUuid, Connection]
itx = ignite.transactions()
ackStarted()
}
/**
* Gets Ignite transaction associated with the current thread, or `null` otherwise.
*/
def tx(): Transaction = catching(wrapIE) {
itx.tx()
}
/**
*
* @return
*/
def inTx(): Boolean = tx() != null
/**
* Attaches JDBC connection to the given transaction. No-op if transaction already
* has attached connection.
*
* @param tx Transaction to attach to.
* @param con JDBC connection to attach.
*/
def attach(tx: Transaction, con: Connection): Unit = {
val xid = tx.xid()
cons.synchronized {
if (!cons.contains(xid))
cons += xid → con
}
}
/**
* Attaches JDBC connection to the current transaction. No-op if there is not transaction or transaction already
* has attached connection.
*
* @param con JDBC connection to attach.
*/
def attach(con: Connection): Unit = {
val x = tx()
if (x != null)
attach(x, con)
}
/**
* Gets connection attached to the given transaction of the current thread, or `null`
* if JDBC connection wasn't attached to that thread.
*/
def connection(tx: Transaction): Connection = cons.synchronized {
cons.get(tx.xid).orNull
}
/**
* Gets connection attached to the transaction of the current thread, or `null`
* if (a) current thread has no ongoing transaction, or (b) JDBC connection wasn't
* attached.
*/
def connection(): Connection = {
val x = tx()
if (x != null)
cons.synchronized {
cons.get(x.xid).orNull
}
else
null
}
/**
*
* @param tx Transaction to finalize.
*/
private def finalizeTx(tx: Transaction): Unit = {
// Close tx (rollback if it wasn't explicitly committed).
try {
if (tx.isRollbackOnly)
tx.rollback()
tx.close()
if (LOG_TX)
logger.trace(s"Finalized tx: ${tx.xid}")
}
finally
// Detach the connection on tx finalization.
cons.synchronized {
cons.remove(tx.xid) match {
// If there was a JDBC connection - explicitly close it on tx close.
case Some(con) ⇒ U.close(con)
case None()
}
}
}
/**
*
* @param tx
* @param f
* @tparam R
* @return
*/
private def startTx[R](tx: Transaction)(f: Transaction ⇒ R): R = {
try {
val res = f(tx)
tx.commit()
res
}
catch {
case e: Throwable ⇒ tx.setRollbackOnly(); throw e
}
finally
finalizeTx(tx)
}
/**
*
* @param tx
* @param f
* @tparam R
* @return
*/
private def startTx0[R](tx: Transaction)(f: ⇒ R): R = {
try {
val res = f
tx.commit()
res
}
catch {
case e: Throwable ⇒ tx.setRollbackOnly(); throw e
}
finally
finalizeTx(tx)
}
private def ack(tx: Transaction): Unit =
if (LOG_TX)
logger.trace(s"New tx started: [xid=${tx.xid()}, iso=${tx.isolation()}, concurrency=${tx.concurrency()}]")
/**
* Executes given closure in the transaction context. It will start a new transaction if the current
* thread does not have one already.
*
* @param cry Transaction concurrency.
* @param iso Transaction isolation.
* @param timeout Transaction timeout.
* @param size Approximate number of cache entries participating in the transaction.
* @param f Closure to execute.
* @tparam R Result type.
*/
@throws[NCE]
def startTx[R](cry: TransactionConcurrency, iso: TransactionIsolation, timeout: Long, size: Int)
(f: Transaction ⇒ R): R =
catching(wrapIE) {
if (inTx()) {
val x = tx()
if (x.concurrency() != cry || x.isolation() != iso || x.timeout() != timeout)
throw new NCE(s"Requested transaction does not match the existing one: $x")
f(x)
}
else {
val tx = itx.txStart(cry, iso, timeout, size)
ack(tx)
startTx[R](tx)(f)
}
}
/**
* Executes given closure in the transaction context. It will start a new transaction if the current
* thread does not have one already.
*
* @param cry Transaction concurrency.
* @param iso Transaction isolation.
* @param f Closure to execute.
* @tparam R Result type.
*/
@throws[NCE]
def startTx[R](cry: TransactionConcurrency, iso: TransactionIsolation)(f: Transaction ⇒ R): R =
catching(wrapIE) {
if (inTx()) {
val x = tx()
if (x.concurrency() != cry || x.isolation() != iso)
throw new NCE(s"Requested transaction does not match the existing one: $x")
f(x)
}
else {
val tx = itx.txStart(cry, iso)
ack(tx)
startTx[R](tx)(f)
}
}
/**
* Executes given closure in the transaction context. It will start a new transaction if the current
* thread does not have one already.
*
* @param f Closure to execute.
* @tparam R Result type.
*/
@throws[NCE]
def startTx[R](f: Transaction ⇒ R): R =
catching(wrapIE) {
if (inTx())
f(tx())
else {
val tx = itx.txStart()
ack(tx)
startTx[R](tx)(f)
}
}
/**
* Executes given closure in the transaction context. It will start a new transaction if the current
* thread does not have one already.
*
* @param f Closure to execute.
* @tparam R Result type.
*/
@throws[NCE]
def startTx[R](f: ⇒ R): R =
catching(wrapIE) {
if (inTx())
f
else {
val tx = itx.txStart()
ack(tx)
startTx0[R](tx)(f)
}
}
}