blob: ae7703309c788715132894392b2c1967577abede [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.ujes.jdbc
import java.sql.{Blob, CallableStatement, Clob, Connection, DatabaseMetaData, NClob, PreparedStatement, ResultSet, SQLException, SQLWarning, SQLXML, Savepoint, Statement, Struct}
import java.util.Properties
import java.util.concurrent.Executor
import java.{sql, util}
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.ujes.client.UJESClient
import com.webank.wedatasphere.linkis.ujes.jdbc.UJESSQLDriverMain._
import org.apache.commons.lang.StringUtils
import scala.collection.{JavaConversions, mutable}
class UJESSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Properties) extends Connection with Logging {
private[jdbc] var creator = "IDE"
private[jdbc] val variableMap = {
val params = props.getProperty(PARAMS)
val map = new mutable.HashMap[String, Any]
if(params != null){
params.split(PARAM_SPLIT).map(_.split(KV_SPLIT)).foreach {
case Array(k, v) if k.startsWith(VARIABLE_HEADER) =>
map += k.substring(VARIABLE_HEADER.length) -> v
case Array(CREATOR, v) =>
creator = v
case _ =>
}
}
map.toMap
}
private[jdbc] val dbName = if (StringUtils.isNotBlank(props.getProperty(DB_NAME))) props.getProperty(DB_NAME) else "default"
private val runningSQLStatements = new util.LinkedList[Statement]
private var closed = false
private var inited = false
private[jdbc] val user = props.getProperty(USER)
private[jdbc] val serverURL = props.getProperty("URL")
private[jdbc] def throwWhenClosed[T](op: => T): T =
if(isClosed) throw new UJESSQLException(UJESSQLErrorCode.CONNECTION_CLOSED)
else op
private def createStatementAndAdd[T<:Statement](op: => T): T = throwWhenClosed {
val statement = op
runningSQLStatements.add(statement)
if (! inited) {
inited = true
Utils.tryAndWarn(statement.execute(s"USE $dbName"))
}
statement
}
def getProps : Properties = props
def removeStatement(statement: UJESSQLStatement): Unit = runningSQLStatements.remove(statement)
override def createStatement(): Statement = createStatementAndAdd(new UJESSQLStatement(this))
override def prepareStatement(sql: String): UJESSQLPreparedStatement = {
val statement = createStatementAndAdd(new UJESSQLPreparedStatement(this, sql))
statement.clearQuery()
statement
}
override def createStatement(resultSetType: Int, resultSetConcurrency: Int): Statement = {
if (resultSetConcurrency != ResultSet.CONCUR_READ_ONLY)
throw new SQLException("Statement with resultset concurrency " + resultSetConcurrency + " is not supported", "HYC00")
if (resultSetType == ResultSet.TYPE_SCROLL_SENSITIVE)
throw new SQLException("Statement with resultset type " + resultSetType + " is not supported", "HYC00")
createStatementAndAdd(new UJESSQLStatement(this))
}
override def prepareStatement(sql: String, autoGeneratedKeys: Int): PreparedStatement = prepareStatement(sql)
override def prepareStatement(sql: String, resultSetType: Int, resultSetConcurrency: Int): PreparedStatement = prepareStatement(sql)
override def getMetaData: DatabaseMetaData = throwWhenClosed(new UJESSQLDatabaseMetaData(this))
override def close(): Unit = {
ujesClient.close()
JavaConversions.asScalaBuffer(runningSQLStatements).foreach(statement => Utils.tryQuietly(statement.close()))
closed = true
}
override def isClosed: Boolean = closed
override def setReadOnly(readOnly: Boolean): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setReadOnly not supported")
override def isReadOnly: Boolean = false
override def setCatalog(catalog: String): Unit = throwWhenClosed()
override def getCatalog: String = ""
override def setTransactionIsolation(level: Int): Unit = {}
override def getTransactionIsolation: Int = Connection.TRANSACTION_NONE
override def getWarnings: SQLWarning = null
override def clearWarnings(): Unit = {}
override def setAutoCommit(autoCommit: Boolean): Unit = {}
override def getAutoCommit: Boolean = true
override def commit(): Unit = {}
override def prepareCall(sql: String): CallableStatement = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "prepareCall not supported")
override def rollback(): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "rollback not supported")
override def nativeSQL(sql: String): String = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "nativeSQL not supported")
override def prepareCall(sql: String, resultSetType: Int, resultSetConcurrency: Int): CallableStatement =
throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "prepareCall not supported")
override def getTypeMap: util.Map[String, Class[_]] = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "getTypeMap not supported")
override def setTypeMap(map: util.Map[String, Class[_]]): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setTypeMap not supported")
override def setHoldability(holdability: Int): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setHoldability not supported")
/**
* Modify by owenxu 2019/8/30
* have to return a default value instead of throwing an exception to
* add the jdbc as a source
* @return
*/
override def getHoldability: Int = 0
override def setSavepoint(): Savepoint = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setSavepoint not supported")
override def setSavepoint(name: String): Savepoint = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setSavepoint not supported")
override def rollback(savepoint: Savepoint): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "rollback not supported")
override def releaseSavepoint(savepoint: Savepoint): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "releaseSavepoint not supported")
override def createStatement(resultSetType: Int, resultSetConcurrency: Int, resultSetHoldability: Int): Statement =
throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createStatement not supported")
override def prepareStatement(sql: String, resultSetType: Int, resultSetConcurrency: Int, resultSetHoldability: Int): PreparedStatement =
throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "prepareStatement not supported")
override def prepareCall(sql: String, resultSetType: Int, resultSetConcurrency: Int, resultSetHoldability: Int): CallableStatement =
throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "prepareCall not supported")
override def prepareStatement(sql: String, columnIndexes: Array[Int]): PreparedStatement =
throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "prepareStatement not supported")
override def prepareStatement(sql: String, columnNames: Array[String]): PreparedStatement =
throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "prepareStatement not supported")
override def createClob(): Clob = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createClob not supported")
override def createBlob(): Blob = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createBlob not supported")
override def createNClob(): NClob = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createNClob not supported")
override def createSQLXML(): SQLXML = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createSQLXML not supported")
override def isValid(timeout: Int): Boolean = true
override def setClientInfo(name: String, value: String): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setClientInfo not supported")
override def setClientInfo(properties: Properties): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "properties not supported")
override def getClientInfo(name: String): String = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "getClientInfo not supported")
override def getClientInfo: Properties = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "getClientInfo not supported")
override def createArrayOf(typeName: String, elements: Array[AnyRef]): sql.Array = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createArrayOf not supported")
override def createStruct(typeName: String, attributes: Array[AnyRef]): Struct = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createStruct not supported")
override def setSchema(schema: String): Unit = throwWhenClosed {
if(StringUtils.isBlank(schema)) throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_STATEMENT, "schema is empty!")
createStatement().execute("use " + schema)
}
override def getSchema: String = throwWhenClosed {
val resultSet = createStatement().executeQuery("SELECT current_database()")
if(!resultSet.next()) throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_STATEMENT, "Get schema failed!")
resultSet.getString(1)
}
override def abort(executor: Executor): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "abort not supported")
override def setNetworkTimeout(executor: Executor, milliseconds: Int): Unit =
throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setNetworkTimeout not supported")
override def getNetworkTimeout: Int = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "getNetworkTimeout not supported")
override def unwrap[T](iface: Class[T]): T = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "unwrap not supported")
override def isWrapperFor(iface: Class[_]): Boolean = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "isWrapperFor not supported")
}