[CARBONDATA-3666] Avoided listing of table dir in refresh command

Why is this PR needed?
Currently if a refresh command is fired on a parquet table using carbon session then carbon will list all the tables and check whether the table exists or not, then we check if the schema file exists or not by listing the Metadata folder. This can be a problem in cloud scenarios as the listing on S3 is slow.

What changes were proposed in this PR?
get the metadata for the specified table, Then go for table listing only if the provider is carbon or the table is not registered in hive

Does this PR introduce any user interface change?
No

Is any new testcase added?
No

This closes #3581
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index f0fe08b..e70fc24 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -32,6 +32,7 @@
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.{SparkContext, SparkEnv}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
 import org.apache.spark.util.FileUtils
 
@@ -832,4 +833,13 @@
     }
     displaySize
   }
+
+  def isCarbonDataSource(catalogTable: CatalogTable): Boolean = {
+    catalogTable.provider match {
+      case Some(x) => x.equalsIgnoreCase("org.apache.spark.sql.CarbonSource") ||
+                      x.equalsIgnoreCase("carbondata")
+      case None => false
+    }
+  }
+
 }
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index e3f1d3f..968738a 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -35,6 +35,8 @@
 
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
+import org.apache.carbondata.spark.util.CommonUtil
+
 /**
  * This class refresh the relation from cache if the carbontable in
  * carbon catalog is not same as cached carbon relation's carbon table.
@@ -59,20 +61,16 @@
      * Set the stats to none in case of carbontable
      */
     def setStatsNone(catalogTable: CatalogTable): Unit = {
-      catalogTable.provider match {
-        case Some(provider)
-          if provider.equals("org.apache.spark.sql.CarbonSource") ||
-             provider.equalsIgnoreCase("carbondata") =>
-          // Update stats to none in case of carbon table as we are not expecting any stats from
-          // Hive. Hive gives wrong stats for carbon table.
-          catalogTable.stats match {
-            case Some(stats) =>
-              CarbonReflectionUtils.setFieldToCaseClass(catalogTable, "stats", None)
-            case _ =>
-          }
-          isRelationRefreshed =
-            CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession)
-        case _ =>
+      if (CommonUtil.isCarbonDataSource(catalogTable)) {
+        // Update stats to none in case of carbon table as we are not expecting any stats from
+        // Hive. Hive gives wrong stats for carbon table.
+        catalogTable.stats match {
+          case Some(stats) =>
+            CarbonReflectionUtils.setFieldToCaseClass(catalogTable, "stats", None)
+          case _ =>
+        }
+        isRelationRefreshed =
+          CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession)
       }
     }
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 9251cf0..17e628f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -24,6 +24,7 @@
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand}
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
@@ -41,6 +42,7 @@
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Command to register carbon table from existing carbon table data
@@ -52,24 +54,31 @@
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
     val databaseName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     setAuditTable(databaseName, tableName)
     // Steps
-    // 1. get table path
-    // 2. perform the below steps
-    // 2.1 check if the table already register with hive then ignore and continue with the next
-    // schema
+    // 1. Get Table Metadata from spark.
+    // 2 Perform below steps:
+    // 2.1 If table exists then check if provider if carbon. If yes then go for carbon
+    // refresh otherwise no need to do anything.
+    // 2.1.1 If table does not exists then consider the table as carbon and check for schema file
+    // existence.
     // 2.2 register the table with the hive check if the table being registered has aggregate table
     // then do the below steps
     // 2.2.1 validate that all the aggregate tables are copied at the store location.
     // 2.2.2 Register the aggregate tables
-    val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName.toLowerCase)(sparkSession)
-    val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName.toLowerCase)
     // 2.1 check if the table already register with hive then ignore and continue with the next
     // schema
-    if (!sparkSession.sessionState.catalog.listTables(databaseName)
-      .exists(_.table.equalsIgnoreCase(tableName))) {
+    val isCarbonDataSource = try {
+      CommonUtil.isCarbonDataSource(sparkSession.sessionState.catalog
+        .getTableMetadata(TableIdentifier(tableName, databaseNameOp)))
+    } catch {
+      case _: NoSuchTableException =>
+        true
+    }
+    if (isCarbonDataSource) {
+      val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName.toLowerCase)(sparkSession)
+      val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName.toLowerCase)
       // check the existence of the schema file to know its a carbon table
       val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
       // if schema file does not exist then the table will either non carbon table or stale
@@ -106,9 +115,7 @@
         }
       }
     }
-    RefreshTable(
-      TableIdentifier(identifier.getTableName, Option(identifier.getDatabaseName))
-    ).run(sparkSession)
+    RefreshTable(TableIdentifier(tableName, Option(databaseName))).run(sparkSession)
   }
 
   /**
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 80d3044..68f7442 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -31,6 +31,8 @@
 import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Carbon strategies for ddl commands
@@ -147,20 +149,20 @@
         if isCarbonTable(truncateTable.tableName) =>
         ExecutedCommandExec(CarbonTruncateCommand(truncateTable)) :: Nil
       case createTable@org.apache.spark.sql.execution.datasources.CreateTable(_, _, None)
-        if isCarbonDataSourceTable(createTable.tableDesc) =>
+        if CommonUtil.isCarbonDataSource(createTable.tableDesc) =>
         ExecutedCommandExec(DDLHelper.createDataSourceTable(createTable, sparkSession)) :: Nil
       case MatchCreateDataSourceTable(tableDesc, mode, query)
-        if isCarbonDataSourceTable(tableDesc) =>
+        if CommonUtil.isCarbonDataSource(tableDesc) =>
         ExecutedCommandExec(
           DDLHelper.createDataSourceTableAsSelect(tableDesc, query, mode, sparkSession)
         ) :: Nil
       case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, query)
-        if isCarbonDataSourceTable(tableDesc) =>
+        if CommonUtil.isCarbonDataSource(tableDesc) =>
         ExecutedCommandExec(
           DDLHelper.createDataSourceTableAsSelect(tableDesc, query.get, mode, sparkSession)
         ) :: Nil
       case createTable@CreateDataSourceTableCommand(table, _)
-        if isCarbonDataSourceTable(table) =>
+        if CommonUtil.isCarbonDataSource(table) =>
         ExecutedCommandExec(
           DDLHelper.createDataSourceTable(createTable, sparkSession)
         ) :: Nil
@@ -195,12 +197,6 @@
     CarbonPlanHelper.isCarbonTable(tableIdent, sparkSession)
   }
 
-  private def isCarbonDataSourceTable(table: CatalogTable): Boolean = {
-    table.provider.get != DDLUtils.HIVE_PROVIDER &&
-    (table.provider.get.equals("org.apache.spark.sql.CarbonSource") ||
-     table.provider.get.equalsIgnoreCase("carbondata"))
-  }
-
   private def isCarbonHiveTable(table: CatalogTable): Boolean = {
     table.provider.isDefined &&
     DDLUtils.HIVE_PROVIDER == table.provider.get &&
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 6c7b1f2..15b2a51 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -52,7 +52,7 @@
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.util.CarbonSparkUtil
+import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil}
 
 case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
   // use to lock the carbonTables
@@ -216,12 +216,9 @@
               "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
         val catalogTable =
           CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable]
-        catalogTable.provider match {
-          case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource")
-            || name.equalsIgnoreCase("carbondata")) => name
-          case _ =>
-            CarbonMetadata.getInstance().removeTable(database, tableIdentifier.table)
-            throw new NoSuchTableException(database, tableIdentifier.table)
+        if (!CommonUtil.isCarbonDataSource(catalogTable)) {
+          CarbonMetadata.getInstance().removeTable(database, tableIdentifier.table)
+          throw new NoSuchTableException(database, tableIdentifier.table)
         }
         val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
            catalogTable.location.toString, database, tableIdentifier.table)
@@ -540,11 +537,8 @@
               "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
         val catalogTable =
           CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable]
-        catalogTable.provider match {
-          case Some(name) if (name.equals("org.apache.spark.sql.CarbonSource")
-            || name.equalsIgnoreCase("carbondata")) => name
-          case _ =>
-            throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table)
+        if (!CommonUtil.isCarbonDataSource(catalogTable)) {
+          throw new NoSuchTableException(tableIdentifier.database.get, tableIdentifier.table)
         }
         val tableLocation = catalogTable.storage.locationUri match {
           case tableLoc@Some(uri) =>