[CARBONDATA-3501] Fix update table with varchar column

Problem
Update on table with varchar column will throw exception

Analyse
In the loading part of update operation, it gets the isVarcharTypeMapping for each column in the order when table created. And this gives a hint for checking string length. It does not allow to exceeds 32000 char for a column which is not varchar type.

However when changing the plan for updating in CarbonIUDRule, it first deletes the old expression and appends the new one, which makes the order differ to table created. Such that the string length checking fail.

Solution
Keep the order as table created when modify update plan

This closes #3398
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
index 4fd2cc0..9719cfc 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
@@ -389,6 +389,16 @@
 
     sql("DROP TABLE IF EXISTS varchar_complex_table")
   }
+  
+  test("update table with long string column") {
+    prepareTable()
+    // update non-varchar column
+    sql(s"update $longStringTable set(id)=(0) where name is not null").show()
+    // update varchar column
+    sql(s"update $longStringTable set(description)=('empty') where name is not null").show()
+    // update non-varchar&varchar column
+    sql(s"update $longStringTable set(description, id)=('sth.', 1) where name is not null").show()
+  }
 
     // ignore this test in CI, because it will need at least 4GB memory to run successfully
   ignore("Exceed 2GB per column page for varchar datatype") {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 6a03eab..b2f9a1e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -1060,7 +1060,7 @@
     val dropAttributes = df.logicalPlan.output.dropRight(1)
     val finalOutput = catalogTable.schema.map { attr =>
       dropAttributes.find { d =>
-        val index = d.name.lastIndexOf("-updatedColumn")
+        val index = d.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)
         if (index > 0) {
           d.name.substring(0, index).equalsIgnoreCase(attr.name)
         } else {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 9b923b0..d11bf1e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -122,9 +122,9 @@
         val renamedProjectList = projectList.zip(columns).map { case (attr, col) =>
           attr match {
             case UnresolvedAlias(child22, _) =>
-              UnresolvedAlias(Alias(child22, col + "-updatedColumn")())
+              UnresolvedAlias(Alias(child22, col + CarbonCommonConstants.UPDATED_COL_EXTENSION)())
             case UnresolvedAttribute(_) =>
-              UnresolvedAlias(Alias(attr, col + "-updatedColumn")())
+              UnresolvedAlias(Alias(attr, col + CarbonCommonConstants.UPDATED_COL_EXTENSION)())
             case _ => attr
           }
         }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
index ae5825d..da1ca55 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
@@ -23,6 +23,8 @@
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command.mutation.CarbonProjectForUpdateCommand
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
 /**
  * Rule specific for IUD operations
  */
@@ -37,9 +39,9 @@
         var isTransformed = false
         val newPlan = updatePlan transform {
           case Project(pList, child) if !isTransformed =>
-            val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
+            var (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
               .splitAt(pList.size - cols.size)
-            val diff = cols.diff(dest.map(_.name.toLowerCase))
+            // check complex column
             cols.foreach { col =>
               val complexExists = "\"name\":\"" + col + "\""
               if (dest.exists(m => m.dataType.json.contains(complexExists))) {
@@ -47,11 +49,20 @@
                   "Unsupported operation on Complex data type")
               }
             }
+            // check updated columns exists in table
+            val diff = cols.diff(dest.map(_.name.toLowerCase))
             if (diff.nonEmpty) {
               sys.error(s"Unknown column(s) ${ diff.mkString(",") } in table ${ table.tableName }")
             }
+            // modify plan for updated column *in place*
             isTransformed = true
-            Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ source, child)
+            source.foreach { col =>
+              val colName = col.name.substring(0,
+                col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))
+              val updateIdx = dest.indexWhere(_.name.equalsIgnoreCase(colName))
+              dest = dest.updated(updateIdx, col)
+            }
+            Project(dest, child)
         }
         CarbonProjectForUpdateCommand(
           newPlan, table.tableIdentifier.database, table.tableIdentifier.table, cols)