| /* |
| * Copyright 2019 WeBank |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.webank.wedatasphere.linkis.ujes.jdbc |
| |
| import java.sql.{Blob, CallableStatement, Clob, Connection, DatabaseMetaData, NClob, PreparedStatement, ResultSet, SQLException, SQLWarning, SQLXML, Savepoint, Statement, Struct} |
| import java.util.Properties |
| import java.util.concurrent.Executor |
| import java.{sql, util} |
| |
| import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils} |
| import com.webank.wedatasphere.linkis.ujes.client.UJESClient |
| import com.webank.wedatasphere.linkis.ujes.jdbc.UJESSQLDriverMain._ |
| import org.apache.commons.lang.StringUtils |
| |
| import scala.collection.{JavaConversions, mutable} |
| |
| class UJESSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Properties) extends Connection with Logging { |
| private[jdbc] var creator = "IDE" |
| private[jdbc] val variableMap = { |
| val params = props.getProperty(PARAMS) |
| val map = new mutable.HashMap[String, Any] |
| if(params != null){ |
| params.split(PARAM_SPLIT).map(_.split(KV_SPLIT)).foreach { |
| case Array(k, v) if k.startsWith(VARIABLE_HEADER) => |
| map += k.substring(VARIABLE_HEADER.length) -> v |
| case Array(CREATOR, v) => |
| creator = v |
| case _ => |
| } |
| } |
| map.toMap |
| } |
| private[jdbc] val dbName = if (StringUtils.isNotBlank(props.getProperty(DB_NAME))) props.getProperty(DB_NAME) else "default" |
| |
| private val runningSQLStatements = new util.LinkedList[Statement] |
| |
| private var closed = false |
| |
| private var inited = false |
| |
| private[jdbc] val user = props.getProperty(USER) |
| |
| private[jdbc] val serverURL = props.getProperty("URL") |
| |
| |
| |
| private[jdbc] def throwWhenClosed[T](op: => T): T = |
| if(isClosed) throw new UJESSQLException(UJESSQLErrorCode.CONNECTION_CLOSED) |
| else op |
| |
| private def createStatementAndAdd[T<:Statement](op: => T): T = throwWhenClosed { |
| |
| val statement = op |
| runningSQLStatements.add(statement) |
| if (! inited) { |
| inited = true |
| Utils.tryAndWarn(statement.execute(s"USE $dbName")) |
| } |
| statement |
| } |
| def getProps : Properties = props |
| |
| def removeStatement(statement: UJESSQLStatement): Unit = runningSQLStatements.remove(statement) |
| |
| override def createStatement(): Statement = createStatementAndAdd(new UJESSQLStatement(this)) |
| |
| override def prepareStatement(sql: String): UJESSQLPreparedStatement = { |
| val statement = createStatementAndAdd(new UJESSQLPreparedStatement(this, sql)) |
| statement.clearQuery() |
| statement |
| } |
| |
| override def createStatement(resultSetType: Int, resultSetConcurrency: Int): Statement = { |
| if (resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) |
| throw new SQLException("Statement with resultset concurrency " + resultSetConcurrency + " is not supported", "HYC00") |
| if (resultSetType == ResultSet.TYPE_SCROLL_SENSITIVE) |
| throw new SQLException("Statement with resultset type " + resultSetType + " is not supported", "HYC00") |
| createStatementAndAdd(new UJESSQLStatement(this)) |
| } |
| |
| override def prepareStatement(sql: String, autoGeneratedKeys: Int): PreparedStatement = prepareStatement(sql) |
| |
| override def prepareStatement(sql: String, resultSetType: Int, resultSetConcurrency: Int): PreparedStatement = prepareStatement(sql) |
| |
| override def getMetaData: DatabaseMetaData = throwWhenClosed(new UJESSQLDatabaseMetaData(this)) |
| |
| override def close(): Unit = { |
| ujesClient.close() |
| JavaConversions.asScalaBuffer(runningSQLStatements).foreach(statement => Utils.tryQuietly(statement.close())) |
| closed = true |
| } |
| |
| override def isClosed: Boolean = closed |
| |
| override def setReadOnly(readOnly: Boolean): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setReadOnly not supported") |
| |
| override def isReadOnly: Boolean = false |
| |
| override def setCatalog(catalog: String): Unit = throwWhenClosed() |
| |
| override def getCatalog: String = "" |
| |
| override def setTransactionIsolation(level: Int): Unit = {} |
| |
| override def getTransactionIsolation: Int = Connection.TRANSACTION_NONE |
| |
| override def getWarnings: SQLWarning = null |
| |
| override def clearWarnings(): Unit = {} |
| |
| override def setAutoCommit(autoCommit: Boolean): Unit = {} |
| |
| override def getAutoCommit: Boolean = true |
| |
| override def commit(): Unit = {} |
| |
| override def prepareCall(sql: String): CallableStatement = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "prepareCall not supported") |
| |
| override def rollback(): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "rollback not supported") |
| |
| override def nativeSQL(sql: String): String = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "nativeSQL not supported") |
| |
| override def prepareCall(sql: String, resultSetType: Int, resultSetConcurrency: Int): CallableStatement = |
| throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "prepareCall not supported") |
| |
| override def getTypeMap: util.Map[String, Class[_]] = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "getTypeMap not supported") |
| |
| override def setTypeMap(map: util.Map[String, Class[_]]): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setTypeMap not supported") |
| |
| override def setHoldability(holdability: Int): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setHoldability not supported") |
| |
| /** |
| * Modify by owenxu 2019/8/30 |
| * have to return a default value instead of throwing an exception to |
| * add the jdbc as a source |
| * @return |
| */ |
| override def getHoldability: Int = 0 |
| |
| override def setSavepoint(): Savepoint = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setSavepoint not supported") |
| |
| override def setSavepoint(name: String): Savepoint = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setSavepoint not supported") |
| |
| override def rollback(savepoint: Savepoint): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "rollback not supported") |
| |
| override def releaseSavepoint(savepoint: Savepoint): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "releaseSavepoint not supported") |
| |
| override def createStatement(resultSetType: Int, resultSetConcurrency: Int, resultSetHoldability: Int): Statement = |
| throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createStatement not supported") |
| |
| override def prepareStatement(sql: String, resultSetType: Int, resultSetConcurrency: Int, resultSetHoldability: Int): PreparedStatement = |
| throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "prepareStatement not supported") |
| |
| override def prepareCall(sql: String, resultSetType: Int, resultSetConcurrency: Int, resultSetHoldability: Int): CallableStatement = |
| throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "prepareCall not supported") |
| |
| override def prepareStatement(sql: String, columnIndexes: Array[Int]): PreparedStatement = |
| throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "prepareStatement not supported") |
| |
| override def prepareStatement(sql: String, columnNames: Array[String]): PreparedStatement = |
| throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "prepareStatement not supported") |
| |
| override def createClob(): Clob = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createClob not supported") |
| |
| override def createBlob(): Blob = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createBlob not supported") |
| |
| override def createNClob(): NClob = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createNClob not supported") |
| |
| override def createSQLXML(): SQLXML = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createSQLXML not supported") |
| |
| override def isValid(timeout: Int): Boolean = true |
| |
| override def setClientInfo(name: String, value: String): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setClientInfo not supported") |
| |
| override def setClientInfo(properties: Properties): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "properties not supported") |
| |
| override def getClientInfo(name: String): String = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "getClientInfo not supported") |
| |
| override def getClientInfo: Properties = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "getClientInfo not supported") |
| |
| override def createArrayOf(typeName: String, elements: Array[AnyRef]): sql.Array = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createArrayOf not supported") |
| |
| override def createStruct(typeName: String, attributes: Array[AnyRef]): Struct = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "createStruct not supported") |
| |
| override def setSchema(schema: String): Unit = throwWhenClosed { |
| if(StringUtils.isBlank(schema)) throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_STATEMENT, "schema is empty!") |
| createStatement().execute("use " + schema) |
| } |
| |
| override def getSchema: String = throwWhenClosed { |
| val resultSet = createStatement().executeQuery("SELECT current_database()") |
| if(!resultSet.next()) throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_STATEMENT, "Get schema failed!") |
| resultSet.getString(1) |
| } |
| |
| override def abort(executor: Executor): Unit = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "abort not supported") |
| |
| override def setNetworkTimeout(executor: Executor, milliseconds: Int): Unit = |
| throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "setNetworkTimeout not supported") |
| |
| override def getNetworkTimeout: Int = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "getNetworkTimeout not supported") |
| |
| override def unwrap[T](iface: Class[T]): T = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "unwrap not supported") |
| |
| override def isWrapperFor(iface: Class[_]): Boolean = throw new UJESSQLException(UJESSQLErrorCode.NOSUPPORT_CONNECTION, "isWrapperFor not supported") |
| } |