[CARBONDATA-4192] UT cases correction for validating the exception message correctly

Why is this PR needed?
Currently, when we check the exception message like below, it is not asserting/failing/
catching if the message content is different.
`intercept[UnsupportedOperationException](
 sql("update test set(a)=(4) where id=1").collect()).getMessage.contains("abc")`

What changes were proposed in this PR?
1. Added assert condition like below for validating the exception message correctly
   `assert(intercept[UnsupportedOperationException](
    sql("update test set(a)=(4) where id=1").collect()).getMessage.contains("abc"))`
2. Added assert condition to check exception message for some test cases which are
   not checking exception message
3. Fixed add segment doc heading related issues

This closes #4140
diff --git a/docs/addsegment-guide.md b/docs/addsegment-guide.md
index a45e6d6..78b15e6 100644
--- a/docs/addsegment-guide.md
+++ b/docs/addsegment-guide.md
@@ -17,19 +17,19 @@
 
 # Heterogeneous format segments in carbondata
 
-###Background
+### Background
 In the industry, many users already adopted to data with different formats like ORC, Parquet, JSON, CSV etc.,  
 If users want to migrate to Carbondata for better performance or for better features then there is no direct way. 
 All the existing data needs to be converted to Carbondata to migrate.  
 This solution works out if the existing data is less, what if the existing data is more?   
 Heterogeneous format segments aims to solve this problem by avoiding data conversion.
 
-###Add segment with path and format
+### Add segment with path and format
 Users can add the existing data as a segment to the carbon table provided the schema of the data
  and the carbon table should be the same. 
 
 ```
-Alter table table_name add segment options (‘path’= 'hdfs://usr/oldtable,'format'=parquet)
+alter table table_name add segment options ('path'= 'hdfs://usr/oldtable','format'='parquet')
 ```
 In the above command user can add the existing data to the carbon table as a new segment and also
  can provide the data format.
@@ -37,21 +37,21 @@
 During add segment, it will infer the schema from data and validates the schema against the carbon table. 
 If the schema doesn’t match it throws an exception.
 
-###Changes to tablestatus file
-Carbon adds the new segment by adding segment information to tablestatus file. In order to add the path and format information to tablestatus, we are going to add `segmentPath`  and ‘format’  to the tablestatus file. 
+### Changes to tablestatus file
+Carbon adds the new segment by adding segment information to tablestatus file. In order to add the path and format information to tablestatus, we are going to add `segmentPath`  and `format`  to the tablestatus file. 
 And any extra `options` will be added to the segment file.
 
 
-###Changes to Spark Integration
+### Changes to Spark Integration
 During select query carbon reads data through RDD which is created by
   CarbonDatasourceHadoopRelation.buildScan, This RDD reads data from physical carbondata files and provides data to spark query plan.
 To support multiple formats per segment basis we can create multiple RDD using the existing Spark
  file format scan class FileSourceScanExec . This class can generate scan RDD for all spark supported formats. We can union all these multi-format RDD and create a single RDD and provide it to spark query plan.
 
-Note: This integration will be clean as we use the sparks optimized reading, pruning and it
+**Note**: This integration will be clean as we use the sparks optimized reading, pruning and it
  involves whole codegen and vector processing with unsafe support.
 
-###Changes to Presto Integration
+### Changes to Presto Integration
 CarbondataSplitManager can create the splits for carbon and as well as for other formats and 
  choose the page source as per the split.  
 
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index 85ad0db..9e499e7 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -263,23 +263,25 @@
   }
   test("test si creation with struct and map type") {
     sql("create table complextable (country struct<b:string>, name string, id Map<string, string>, arr1 array<string>, arr2 array<string>) stored as carbondata")
-    intercept[RuntimeException] {
+    val errMsg = "one or more specified index cols either does not exist or not a key column or " +
+                 "complex column in table"
+    assert(intercept[RuntimeException] {
       sql("create index index_1 on table complextable(country) as 'carbondata'")
-    }
-    intercept[RuntimeException] {
+    }.getMessage.contains(errMsg))
+    assert(intercept[RuntimeException] {
       sql("create index index_1 on table complextable(id) as 'carbondata'")
-    }
-    intercept[RuntimeException] {
+    }.getMessage.contains(errMsg))
+    assert(intercept[RuntimeException] {
       sql("create index index_1 on table complextable(arr1, arr2) as 'carbondata'")
-    }
+    }.getMessage.contains("SI creation with more than one complex type is not supported yet"))
   }
 
   test("test si creation with array") {
     sql("create table complextable (id int, name string, country array<array<string>>, add array<int>) stored as carbondata")
     sql("drop index if exists index_1 on complextable")
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql("create index index_1 on table complextable(country) as 'carbondata'")
-    }.getMessage.contains("SI creation with nested array complex type is not supported yet")
+    }.getMessage.contains("SI creation with nested array complex type is not supported yet"))
   }
 
   test("test complex with null and empty data") {
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
index 66ce678..c67f46b 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
@@ -60,9 +60,9 @@
 
   test("Testing SI on partition column") {
     sql("drop index if exists indextable1 on uniqdata1")
-    intercept[UnsupportedOperationException] {
+    assert(intercept[UnsupportedOperationException] {
       sql("create index indextable1 on table uniqdata1 (ACTIVE_EMUI_VERSION) AS 'carbondata'")
-    }
+    }.getMessage.contains("Secondary Index cannot be created on a partition column"))
   }
 
   test("Testing SI without partition column") {
@@ -328,9 +328,9 @@
     checkAnswer(sql(
       "select count(*) from uniqdata1 where CUST_ID='9000' and ACTIVE_EMUI_VERSION = 'abc'"),
       Seq(Row(4)))
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql("update uniqdata1 d set (d.CUST_ID) = ('8000')  where d.CUST_ID = '9000'").collect()
-    }
+    }.getMessage.contains("Update is not permitted on table that contains secondary index"))
   }
 
   test("Testing SI on partition table with rename") {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoQueryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoQueryTest.scala
index ddfbdf7..57394aa 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoQueryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoQueryTest.scala
@@ -60,28 +60,28 @@
     loadData()
     createPolygonTable
     // verify empty data on polygon table
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql(s"select longitude, latitude from $geoTable where IN_POLYGON_LIST(" +
           s"'select polygon from $polygonTable','OR')").collect()
-    }.getMessage.contains("polygon list need at least 2 polygons, really has 0")
+    }.getMessage.contains("polygon list need at least 2 polygons, really has 0"))
     sql(s"insert into $polygonTable select 'POLYGON ((120.176433 30.327431,120.171283 30.322245," +
         s"120.181411 30.314540, 120.190509 30.321653,120.185188 30.329358,120.176433 30.327431))" +
         s"','abc','1'")
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql(s"select longitude, latitude from $geoTable where IN_POLYGON_LIST(" +
           s"'select polygon from $polygonTable','OR')").collect()
-    }.getMessage.contains("polygon list need at least 2 polygons, really has 1")
+    }.getMessage.contains("polygon list need at least 2 polygons, really has 1"))
     sql(s"insert into $polygonTable select 'POLYGON ((120.176433 30.327431,120.171283 30.322245," +
         s"120.181411 30.314540, 120.190509 30.321653,120.185188 30.329358,120.176433 30.327431))" +
         s"','abc','1'")
-    intercept[UnsupportedOperationException] {
+    assert(intercept[UnsupportedOperationException] {
       sql(s"select longitude, latitude from $geoTable where IN_POLYGON_LIST(" +
           s"'select polygon,poiId from $polygonTable','OR')").collect()
-    }.getMessage.contains("More than one column exists in the query for Polygon List Udf")
-    intercept[RuntimeException] {
+    }.getMessage.contains("More than one column exists in the query for Polygon List Udf"))
+    assert(intercept[RuntimeException] {
       sql(s"select longitude, latitude from $geoTable where IN_POLYGON_LIST(" +
           s"'select poiId from $polygonTable','OR')").collect()
-    }.getMessage.contains("polygon list need at least 2 polygons, really has 0")
+    }.getMessage.contains("polygon list need at least 2 polygons, really has 0"))
   }
 
   test("test polygon line udf with select query as input") {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 3913ded..7541d3f 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -681,10 +681,14 @@
 
     sql("alter table addSegCar add segment " +
         s"options('path'='${table1.location}', 'format'='parquet')")
-    intercept[Exception] {
+    assert(intercept[Exception] {
       sql("alter table addSegCar add segment " +
           s"options('path'='${table2.location}', 'format'='parquet')")
-    }
+    }.getMessage
+      .contains(
+        "Schema is not same. Table schema is : StructType(StructField(a,IntegerType,true), " +
+        "StructField(b,StringType,true)) and segment schema is : StructType(StructField(a," +
+        "IntegerType,false))"))
     sql("alter table addSegCar add segment " +
         s"options('path'='${table3.location}', 'format'='parquet')")
 
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala
index e1e4108..ffa4715 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableWithColumnMetCacheAndCacheLevelProperty.scala
@@ -52,9 +52,12 @@
   }
 
   test("validate column_meta_cache with only empty spaces - alter_column_meta_cache_01") {
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='    ')")
-    }
+    }.getMessage
+      .contains(
+        "Alter table newProperties operation failed: Invalid value: Empty column names for the " +
+        "option(s)"))
   }
 
   test("validate the property with characters in different cases - alter_column_meta_cache_02") {
@@ -64,16 +67,22 @@
 
   test("validate column_meta_cache with intermediate empty string between columns " +
        "- alter_column_meta_cache_03") {
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2,  ,c3')")
-    }
+    }.getMessage
+      .contains(
+        "Alter table newProperties operation failed: Invalid value: Empty column names for the " +
+        "option(s)"))
   }
 
   test("validate column_meta_cache with combination of valid and invalid columns " +
        "- alter_column_meta_cache_04") {
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2,c10')")
-    }
+    }.getMessage
+      .contains(
+        "Alter table newProperties operation failed: Column c10 does not exists in the table " +
+        "alter_column_meta_cache"))
   }
 
   test("validate column_meta_cache for dimensions and measures - alter_column_meta_cache_05") {
@@ -85,21 +94,28 @@
   }
 
   test("validate for duplicate column names - alter_column_meta_cache_06") {
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c2,c2,c3')")
-    }
+    }.getMessage
+      .contains("Alter table newProperties operation failed: Duplicate column name found : c2"))
   }
 
   test("validate column_meta_cache for complex struct type - alter_column_meta_cache_07") {
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c5')")
-    }
+    }.getMessage
+      .contains(
+        "Alter table newProperties operation failed: c5 is a complex type column and complex type" +
+        " is not allowed for the option(s)"))
   }
 
   test("validate column_meta_cache for complex array type - alter_column_meta_cache_08") {
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql("Alter table alter_column_meta_cache SET TBLPROPERTIES('column_meta_cache'='c5,c2')")
-    }
+    }.getMessage
+      .contains(
+        "Alter table newProperties operation failed: c5 is a complex type column and complex type" +
+        " is not allowed for the option(s): column_meta_cache"))
   }
 
   test("validate column_meta_cache with empty value - alter_column_meta_cache_09") {
@@ -134,15 +150,21 @@
   }
 
   test("validate cache_level with only empty spaces - ALTER_CACHE_LEVEL_01") {
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='    ')")
-    }
+    }.getMessage
+      .contains(
+        "Alter table newProperties operation failed: Invalid value: Empty column names for the " +
+        "option(s)"))
   }
 
   test("validate cache_level with invalid values - ALTER_CACHE_LEVEL_02") {
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql("Alter table cache_level SET TBLPROPERTIES('cache_level'='xyz,abc')")
-    }
+    }.getMessage
+      .contains(
+        "Alter table newProperties operation failed: Invalid value: Allowed values for " +
+        "cache_level are BLOCK AND BLOCKLET"))
   }
 
   test("validate cache_level with property in different cases - ALTER_CACHE_LEVEL_03") {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
index c5589fe..e7cc230 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
@@ -239,9 +239,9 @@
 
     checkAnswer(sql(s"""select count(*) from cleantest"""),
       Seq(Row(4)))
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
-    }
+    }.getMessage.contains("Clean files with force operation not permitted by default"))
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
     sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
@@ -282,9 +282,9 @@
     list = getFileCountInTrashFolder(trashFolderPath)
     assert(list == 4)
 
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
-    }
+    }.getMessage.contains("Clean files with force operation not permitted by default"))
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
     sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
index 7f2a0e1..7f8ef03 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
@@ -121,9 +121,9 @@
     list = getFileCountInTrashFolder(trashFolderPath)
     assert(list == 4)
 
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
-    }
+    }.getMessage.contains("Clean files with force operation not permitted by default"))
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
     sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
@@ -274,9 +274,9 @@
 
     checkAnswer(sql(s"""select count(*) from cleantest"""),
       Seq(Row(4)))
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
-    }
+    }.getMessage.contains("Clean files with force operation not permitted by default"))
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
     sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index c7e59dd..1adb13d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -443,9 +443,9 @@
     assert(new File(writerPath).exists())
     cleanTestData()
 
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       buildTestDataWithSortColumns(List(""))
-    }
+    }.getMessage.contains("column:  specified in sort columns does not exist in schema"))
 
     assert(!(new File(writerPath).exists()))
     cleanTestData()
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/TestIndexCommand.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/TestIndexCommand.scala
index 602f1be..ea6cf69 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/TestIndexCommand.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/TestIndexCommand.scala
@@ -40,22 +40,31 @@
   val newClass = "org.apache.spark.sql.CarbonSource"
 
   test("test index create: don't support using non-exist class") {
-    intercept[MetadataProcessException] {
+    assert(intercept[MetadataProcessException] {
       sql(s"CREATE INDEX index1 ON indextest (a) AS '$newClass'")
-    }
+    }.getMessage
+      .contains(
+        "failed to create IndexClassProvider 'org.apache.spark.sql.CarbonSource': wrong number of" +
+        " arguments"))
   }
 
   test("test index create with properties: don't support using non-exist class") {
-    intercept[MetadataProcessException] {
+    assert(intercept[MetadataProcessException] {
       sql(s"CREATE INDEX index2 ON indextest (a) AS '$newClass' PROPERTIES('key'='value')")
-    }
+    }.getMessage
+      .contains(
+        "failed to create IndexClassProvider 'org.apache.spark.sql.CarbonSource': wrong number of" +
+        " arguments"))
   }
 
   test("test index create with existing name: don't support using non-exist class") {
-    intercept[MetadataProcessException] {
+    assert(intercept[MetadataProcessException] {
       sql(
         s"CREATE INDEX index2 ON indextest (a) AS '$newClass' PROPERTIES('key'='value')")
-    }
+    }.getMessage
+      .contains(
+        "failed to create IndexClassProvider 'org.apache.spark.sql.CarbonSource': wrong number of" +
+        " arguments"))
   }
 
   test("test show indexes with no index") {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 4ad3167..987e5fa 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -237,12 +237,12 @@
     sql("insert into t1 select 1, 'Andy'")
     sql("create table t2 (age int, name string) STORED AS carbondata")
     sql("insert into t2 select 3, 'Andy'")
-    intercept[AnalysisException] {
+    assert(intercept[AnalysisException] {
       sql("update t1 set (age) = " +
           "(select t2.age from t2 where t2.name = t1.name limit 1) " +
           "where t1.age = 1 ").collect()
-    }.getMessage.contains("Update subquery has join with maintable " +
-                          "and limit leads to multiple join for each limit for each row")
+    }.getMessage.contains("Update subquery has join with main table " +
+                          "and limit leads to multiple join for each limit for each row"))
     sql("drop table if exists t1")
     sql("drop table if exists t2")
   }
@@ -297,11 +297,11 @@
     sql("insert into t2 select 2, 'Andy'")
     sql("insert into t2 select 1, 'aa'")
     sql("insert into t2 select 3, 'aa'")
-    intercept[AnalysisException] {
+    assert(intercept[AnalysisException] {
       sql("update t1 set (age) = " +
           "(select t2.age from t2 where t2.name = 'Andy') where t1.age = 1 ").collect()
     }.getMessage.contains("update cannot be supported for 1 to N mapping, " +
-                          "as more than one value present for the update key")
+                          "as more than one value present for the update key"))
     // test join scenario
     val exception1 = intercept[RuntimeException] {
       sql("update t1 set (age) = (select t2.age from t2 where t2.name = t1.name) ").collect()
@@ -555,9 +555,9 @@
   }
 
   test("update carbon table-error[no set columns") {
-    intercept[Exception] {
+    assert(intercept[Exception] {
       sql("""update iud.dest d set () = ()""").collect()
-    }
+    }.getMessage.contains("At least one source column has to be specified"))
   }
 
   test("update carbon table-error[no set columns with updated column") {
diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index 3fc4c62..89c2934 100644
--- a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -355,15 +355,25 @@
     checkExistence(sql("desc restructure"), true, "intfield", "bigint")
     sql("alter table default.restructure change decimalfield deciMalfield Decimal(11,3)")
     sql("alter table default.restructure change decimalfield deciMalfield Decimal(12,3)")
-    intercept[ProcessMetaDataException] {
+    assert(intercept[ProcessMetaDataException] {
       sql("alter table default.restructure change decimalfield deciMalfield Decimal(12,2)")
-    }
-    intercept[ProcessMetaDataException] {
+    }.getMessage
+      .contains(
+        "Alter table data type change operation " +
+        "failed: Given column decimalfield cannot be modified. Specified precision value 12 " +
+        "should be greater than current precision value 12"))
+    assert(intercept[ProcessMetaDataException] {
       sql("alter table default.restructure change decimalfield deciMalfield Decimal(13,1)")
-    }
-    intercept[ProcessMetaDataException] {
+    }.getMessage
+      .contains(
+        "Alter table data type change operation failed: Given column decimalfield cannot be " +
+        "modified. Specified scale value 1 should be greater or equal to current scale value 3"))
+    assert(intercept[ProcessMetaDataException] {
       sql("alter table default.restructure change decimalfield deciMalfield Decimal(13,5)")
-    }
+    }.getMessage
+      .contains(
+        "Alter table data type change operation failed: Given column decimalfield cannot be " +
+        "modified. Specified precision and scale values will lead to data loss"))
     sql("alter table default.restructure change decimalfield deciMalfield Decimal(13,4)")
   }
 
@@ -775,9 +785,12 @@
         s"('sort_columns'='')")
 
     // This throws exception as SORT_COLUMNS is empty
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql("ALTER TABLE t1 SET TBLPROPERTIES('sort_scope'='local_sort')")
-    }
+    }.getMessage
+      .contains(
+        "Alter table newProperties operation failed: Cannot set SORT_SCOPE as local_sort when " +
+        "table has no SORT_COLUMNS"))
 
     // Even if we change the SORT_SCOPE to LOCAL_SORT
     // the SORT_SCOPE should remain to NO_SORT
@@ -803,9 +816,12 @@
     sql("DROP TABLE IF EXISTS t1")
     sql(s"CREATE TABLE t1(age int, name string) STORED AS carbondata TBLPROPERTIES" +
         s"('sort_scope'='local_sort', 'sort_columns'='age')")
-    intercept[RuntimeException] {
+    assert(intercept[RuntimeException] {
       sql("ALTER TABLE t1 SET TBLPROPERTIES('sort_scope'='fake_sort')")
-    }
+    }.getMessage
+      .contains(
+        "Alter table newProperties operation failed: Invalid SORT_SCOPE fake_sort, valid " +
+        "SORT_SCOPE are 'NO_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT"))
 
     // SORT_SCOPE should remain unchanged
     assert(sortScopeInDescFormatted("t1").equalsIgnoreCase("LOCAL_SORT"))
diff --git a/integration/spark/src/test/scala/org/apache/spark/sql/execution/command/CarbonTableSchemaCommonSuite.scala b/integration/spark/src/test/scala/org/apache/spark/sql/execution/command/CarbonTableSchemaCommonSuite.scala
index dbb120f..d907222 100644
--- a/integration/spark/src/test/scala/org/apache/spark/sql/execution/command/CarbonTableSchemaCommonSuite.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/sql/execution/command/CarbonTableSchemaCommonSuite.scala
@@ -55,14 +55,14 @@
          | STORED AS carbondata
        """.stripMargin)
 
-    val ex = intercept[ProcessMetaDataException] {
+    assert(intercept[ProcessMetaDataException] {
       sql(
         s"""
            | alter TABLE carbon_table add columns(
            | bb char(10)
             )
        """.stripMargin)
-    }
+    }.getMessage.contains("Alter table add operation failed: Duplicate column found with name: bb"))
     sql("DROP TABLE IF EXISTS carbon_table")
   }