[CARBONDATA-3652] Make insert and select table columns equal

Why is this PR needed?

Described in https://issues.apache.org/jira/browse/CARBONDATA-3652

What changes were proposed in this PR?

Insert and select table columns should equal.

Does this PR introduce any user interface change?

No

Is any new testcase added?

Yes

This closesd #3559
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index fb1c801..8cca047 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -381,6 +381,18 @@
     assert(sql("show segments for table show_insert").collect().length == 1)
   }
 
+  test("insert and select table column number not equal") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql("DROP TABLE IF EXISTS table2")
+    sql("create table table1 (col1 string, col2 string) partitioned by(pt string) stored by 'carbondata'")
+    sql("create table table2 (t2_c1 string, t2_c2 string, t2_c3 string) partitioned by(pt string)")
+    sql("insert overwrite table table2 partition(pt=20200101) values('v11', 'v12', 'v13')")
+    val e = intercept[Exception] {
+      sql("insert into table1 select * from table2")
+    }
+    assert(e.getMessage.contains("number of columns are different"))
+  }
+
   override def afterAll {
     sql("drop table if exists load")
     sql("drop table if exists inser")
@@ -402,6 +414,8 @@
     sql("DROP TABLE IF EXISTS show_insert")
     sql("drop table if exists OverwriteTable_t1")
     sql("drop table if exists OverwriteTable_t2")
+    sql("drop table if exists table1")
+    sql("drop table if exists table2")
 
     if (timeStampPropOrig != null) {
       CarbonProperties.getInstance()
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 0c6b5aa..5323293 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -285,7 +285,7 @@
     }
     // In spark, PreprocessTableInsertion rule has below cast logic.
     // It was missed in carbon when implemented insert into rules.
-    var newChildOutput = if (child.output.size >= carbonDSRelation.carbonRelation.output.size) {
+    var newChildOutput = if (child.output.size == carbonDSRelation.carbonRelation.output.size) {
       val expectedOutput = carbonDSRelation.carbonRelation.output
       child.output.zip(expectedOutput).map {
         case (actual, expected) =>