[CARBONDATA-3630] update should support limit 1 sub query and empty result subquery

currently update has two flows, update by value and update by join.
a. update by join should be used only if subquery join present with maintable, now in non-join maintable sceneario also join is used. Fixed this.
b. currently subquery with limit 1 cannot support as it goes to join with main table, supported this
c. If subquery with limit 1 with join with main table is present, current design cannot handle it. so throwing exception
d. Supporting sub query with 0 results to update as null (behavior is same as mysql)

This closes #3528
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index cae86c0..0f131b1 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -19,7 +19,7 @@
 import java.io.File
 
 import org.apache.spark.sql.test.Spark2TestQueryExecutor
-import org.apache.spark.sql.{CarbonEnv, Row, SaveMode}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
@@ -80,6 +80,58 @@
     sql("""drop table iud.dest11""").show
   }
 
+  test("update with subquery having limit 1") {
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+    sql("create table t1 (age int, name string) stored by 'carbondata'")
+    sql("insert into t1 select 1, 'aa'")
+    sql("insert into t1 select 3, 'bb'")
+    sql("create table t2 (age int, name string) stored by 'carbondata'")
+    sql("insert into t2 select 3, 'Andy'")
+    sql("insert into t2 select 2, 'Andy'")
+    sql("insert into t2 select 1, 'aa'")
+    sql("insert into t2 select 3, 'aa'")
+    sql("update t1 set (age) = " +
+        "(select t2.age from t2 where t2.name = 'Andy' order by  age limit 1) " +
+        "where t1.age = 1 ").show(false)
+    checkAnswer(sql("select * from t1"), Seq(Row(2,"aa"), Row(3,"bb")))
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+  }
+
+  test("update with subquery giving 0 rows") {
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+    sql("create table t1 (age int, name string) stored by 'carbondata'")
+    sql("insert into t1 select 1, 'aa'")
+    sql("create table t2 (age int, name string) stored by 'carbondata'")
+    sql("insert into t2 select 3, 'Andy'")
+    sql("update t1 set (age) = " +
+        "(select t2.age from t2 where t2.age != 3) " +
+        "where t1.age = 1 ").show(false)
+    // should update to null
+    checkAnswer(sql("select * from t1"), Seq(Row(null,"aa")))
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+  }
+
+  test("update with subquery joing with main table and limit") {
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+    sql("create table t1 (age int, name string) stored by 'carbondata'")
+    sql("insert into t1 select 1, 'Andy'")
+    sql("create table t2 (age int, name string) stored by 'carbondata'")
+    sql("insert into t2 select 3, 'Andy'")
+    intercept[AnalysisException] {
+      sql("update t1 set (age) = " +
+          "(select t2.age from t2 where t2.name = t1.name limit 1) " +
+          "where t1.age = 1 ").show(false)
+    }.getMessage.contains("Update subquery has join with maintable " +
+                          "and limit leads to multiple join for each limit for each row")
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+  }
+
   test("update carbon table[using destination table columns with where and exist]") {
     sql("""drop table if exists iud.dest22""")
     sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 69145d8..10b661a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -20,7 +20,8 @@
 import scala.collection.mutable
 import scala.language.implicitConversions
 
-import org.apache.spark.sql.{CarbonToSparkAdapter, DeleteRecords, UpdateTable}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{CarbonToSparkAdapter, Dataset, DeleteRecords, ProjectForUpdate, SparkSession, UpdateTable}
 import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -31,7 +32,7 @@
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
 import org.apache.spark.sql.execution.command.cache.{CarbonDropCacheCommand, CarbonShowCacheCommand}
 import org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, CarbonDropStreamCommand, CarbonShowStreamsCommand}
 import org.apache.spark.sql.util.CarbonException
@@ -247,8 +248,67 @@
       case tab ~ columns ~ rest =>
         val (sel, where) = splitQuery(rest)
         val selectPattern = """^\s*select\s+""".r
+        // In case of "update = (subquery) where something"
+        // If subquery has join with main table, then only it should go to "update by join" flow.
+        // Else it should go to "update by value" flow.
+        // In update by value flow, we need values to update.
+        // so need to execute plan and collect values from subquery if is not join with main table.
+        var subQueryResults : String = ""
+        if (selectPattern.findFirstIn(sel.toLowerCase).isDefined) {
+          // subQuery starts with select
+          val mainTableName = tab._4.table
+          val mainTableAlias = if (tab._3.isDefined) {
+            tab._3.get
+          } else {
+            ""
+          }
+          val session = SparkSession.getActiveSession.get
+          val subQueryUnresolvedLogicalPlan = session.sessionState.sqlParser.parsePlan(sel)
+          var isJoinWithMainTable : Boolean = false
+          var isLimitPresent : Boolean = false
+          subQueryUnresolvedLogicalPlan collect {
+            case f: Filter =>
+              f.condition.collect {
+                case attr: UnresolvedAttribute =>
+                  if ((!StringUtils.isEmpty(mainTableAlias) &&
+                       attr.nameParts.head.equalsIgnoreCase(mainTableAlias)) ||
+                      attr.nameParts.head.equalsIgnoreCase(mainTableName)) {
+                    isJoinWithMainTable = true
+                  }
+              }
+            case _: GlobalLimit =>
+              isLimitPresent = true
+          }
+          if (isJoinWithMainTable && isLimitPresent) {
+            throw new UnsupportedOperationException(
+              "Update subquery has join with maintable and limit leads to multiple join for each " +
+              "limit for each row")
+          }
+          if (!isJoinWithMainTable) {
+            // Should go as value update, not as join update. So execute the sub query.
+            val analyzedPlan = CarbonReflectionUtils.invokeAnalyzerExecute(session
+              .sessionState
+              .analyzer, subQueryUnresolvedLogicalPlan)
+            val subQueryLogicalPlan = session.sessionState.optimizer.execute(analyzedPlan)
+            val df = Dataset.ofRows(session, subQueryLogicalPlan)
+            val rowsCount = df.count()
+            if (rowsCount == 0L) {
+              // if length = 0, update to null
+              subQueryResults = "null"
+            } else if (rowsCount != 1) {
+              throw new UnsupportedOperationException(
+                " update cannot be supported for 1 to N mapping, as more than one value present " +
+                "for the update key")
+            } else {
+              subQueryResults = "'" + df.collect()(0).toSeq.mkString("','") + "'"
+            }
+          }
+        }
         val (selectStmt, relation) =
-          if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined) {
+          if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined ||
+              !StringUtils.isEmpty(subQueryResults)) {
+            // if subQueryResults are not empty means, it is not join with main table.
+            // so use subQueryResults in update with value flow.
             if (sel.trim.isEmpty) {
               sys.error("At least one source column has to be specified ")
             }
@@ -262,12 +322,16 @@
                 }
               case _ => tab._1
             }
-
+            val newSel = if (!StringUtils.isEmpty(subQueryResults)) {
+              subQueryResults
+            } else {
+              sel
+            }
             tab._3 match {
               case Some(a) =>
-                ("select " + sel + " from " + getTableName(tab._2) + " " + tab._3.get, relation)
+                ("select " + newSel + " from " + getTableName(tab._2) + " " + tab._3.get, relation)
               case None =>
-                ("select " + sel + " from " + getTableName(tab._2), relation)
+                ("select " + newSel + " from " + getTableName(tab._2), relation)
             }
 
           } else {