| /* |
| * Copyright 2019 WeBank |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.webank.wedatasphere.linkis.engine.cs |
| |
| import java.util.regex.Pattern |
| |
| import com.webank.wedatasphere.linkis.common.utils.Logging |
| import com.webank.wedatasphere.linkis.cs.client.service.CSTableService |
| import com.webank.wedatasphere.linkis.cs.common.entity.metadata.CSTable |
| import com.webank.wedatasphere.linkis.cs.common.utils.CSCommonUtils |
| import com.webank.wedatasphere.linkis.engine.exception.ExecuteError |
| import com.webank.wedatasphere.linkis.engine.execute.EngineExecutorContext |
| import org.apache.commons.lang.StringUtils |
| import org.apache.spark.sql.SparkSession |
| import org.apache.spark.sql.execution.datasources.csv.DolphinToSpark |
| |
| import scala.collection.mutable.ArrayBuffer |
| |
| /** |
| * @author peacewong |
| * @date 2020/3/6 9:57 |
| */ |
| class CSTableParser extends Logging { |
| |
| private val pb: Pattern = Pattern.compile(CSCommonUtils.CS_TMP_TABLE_PREFIX + "[^\\s\";'()]+[$\\s]{0,1}", Pattern.CASE_INSENSITIVE) |
| |
| private val DB = "default." |
| |
| |
| private def getCSTempTable(code: String): Array[String] = { |
| val bmlResourceNames = new ArrayBuffer[String]() |
| val mb = pb.matcher(code) |
| while (mb.find) bmlResourceNames.append(mb.group.trim) |
| bmlResourceNames.toArray |
| } |
| |
| /** |
| * 1. code parse cs_tamp |
| * 2. getCSTable |
| * 3. registerTable:暂时用dopphin,后续修改为拼接sql语句 |
| * |
| * @param engineExecutorContext |
| * @param code |
| * @param contextIDValueStr |
| * @param nodeNameStr |
| * @return |
| */ |
| def parse(engineExecutorContext: EngineExecutorContext, code: String, contextIDValueStr: String, nodeNameStr: String): String = { |
| val csTempTables = getCSTempTable(code) |
| val parsedTables = new ArrayBuffer[String]() |
| csTempTables.foreach{ csTempTable => |
| val table = getCSTable(csTempTable, contextIDValueStr, nodeNameStr) |
| if (null == table){ |
| throw new ExecuteError(40007,s"The csTable that name is $csTempTable not found in cs") |
| } |
| registerTempTable(table) |
| parsedTables.append(csTempTable) |
| } |
| StringUtils.replaceEach(code, csTempTables,parsedTables.toArray) |
| } |
| |
| /** |
| * TODO peaceWong From cs to get csTable |
| * Exact Match |
| * @param csTempTable |
| * @return |
| */ |
| def getCSTable(csTempTable:String, contextIDValueStr: String, nodeNameStr: String):CSTable = { |
| CSTableService.getInstance().getUpstreamSuitableTable(contextIDValueStr, nodeNameStr, csTempTable) |
| } |
| |
| def registerTempTable(csTable: CSTable):Unit = { |
| val spark = SparkSession.builder().enableHiveSupport().getOrCreate() |
| info(s"Start to create tempView to sparkSession viewName(${csTable.getName}) location(${csTable.getLocation})") |
| DolphinToSpark.createTempView(spark, csTable.getName, csTable.getLocation, true) |
| info(s"Finished to create tempView to sparkSession viewName(${csTable.getName}) location(${csTable.getLocation})") |
| } |
| } |