[CARBONDATA-3875] Support show segments with stage

Why is this PR needed?
Currently, there is a lack of monitoring of the stages information, 'Show segments with stage' command shall be supported, which can provide monitoring information, such as createTime, partitioninfo, etc.

What changes were proposed in this PR?
added 'With stage semantics' in the show segments flow, which will collect stageinfo by read stage files.

Does this PR introduce any user interface change?
Yes.

Is any new testcase added?
Yes

This closes #3798
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
index 10dd51d..893b962 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
@@ -45,6 +45,16 @@
    */
   private List<PartitionLocation> locations;
 
+  /**
+   * current stage create at this time.
+   */
+  private transient long createTime;
+
+  /**
+   * status of stage, unloaded or loading.
+   */
+  private StageStatus status;
+
   public StageInput() {
 
   }
@@ -83,6 +93,14 @@
     this.locations = locations;
   }
 
+  public StageStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(StageStatus status) {
+    this.status = status;
+  }
+
   public List<InputSplit> createSplits() {
     return
         files.entrySet().stream().filter(
@@ -94,6 +112,14 @@
         ).collect(Collectors.toList());
   }
 
+  public long getCreateTime() {
+    return createTime;
+  }
+
+  public void setCreateTime(long createTime) {
+    this.createTime = createTime;
+  }
+
   public static final class PartitionLocation {
 
     public PartitionLocation() {
@@ -133,4 +159,8 @@
 
   }
 
+  public enum StageStatus {
+    Unload, Loading
+  }
+
 }
diff --git a/docs/segment-management-on-carbondata.md b/docs/segment-management-on-carbondata.md
index 3ef0a3a..6c144b1 100644
--- a/docs/segment-management-on-carbondata.md
+++ b/docs/segment-management-on-carbondata.md
@@ -32,7 +32,7 @@
 
   ```
   SHOW [HISTORY] SEGMENTS
-  [FOR TABLE | ON] [db_name.]table_name [LIMIT number_of_segments]
+  [FOR TABLE | ON] [db_name.]table_name [INCLUDE STAGE] [LIMIT number_of_segments]
   [AS (select query from table_name_segments)]
   ```
 
@@ -65,6 +65,12 @@
   SHOW HISTORY SEGMENTS ON CarbonDatabase.CarbonTable
   ```
 
+  Show all segments, include stages
+  Stage status is 'Unload' or 'Loading', and stage ID, stage load time taken is null.
+  ```
+  SHOW SEGMENTS ON CarbonDatabase.CarbonTable INCLUDE STAGE
+  ```
+
 
   When more detail of the segment is required, user can issue SHOW SEGMENT by query.    
     
@@ -99,6 +105,9 @@
   
   SHOW SEGMENTS ON CarbonTable AS
   SELECT avg(timeTakenMs) FROM CarbonTable_segments  
+  
+  SHOW SEGMENTS ON CarbonTable INCLUDE STAGE AS
+  SELECT avg(timeTakenMs) FROM CarbonTable_segments
   ```
 
 
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index 61c4121..5e82b96 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -17,17 +17,10 @@
 
 package org.apache.carbon.flink
 
-import java.io.{File, InputStreamReader}
-import java.util
-import java.util.concurrent.{Callable, Executors}
-import java.util.{Base64, Collections, Properties}
+import java.text.SimpleDateFormat
+import java.util.concurrent.Executors
+import java.util.{Base64, Properties}
 
-import com.google.gson.Gson
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.statusmanager.StageInput
-import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.core.fs.Path
@@ -35,11 +28,15 @@
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
-import scala.collection.JavaConverters._
-
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+
+import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
 import org.scalatest.BeforeAndAfterAll
 
 class TestCarbonPartitionWriter extends QueryTest with BeforeAndAfterAll{
@@ -69,6 +66,113 @@
     }
   }
 
+  test("Show segments with stage") {
+    createPartitionTable
+    try {
+      val tablePath = storeLocation + "/" + tableName + "/"
+      val tableStagePath = CarbonTablePath.getStageDir(tablePath)
+      val writerProperties = newWriterProperties(dataTempPath)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.enableCheckpointing(2000L)
+      val dataCount = 1000
+      val source = getTestSource(dataCount)
+      executeStreamingEnvironment(tablePath, writerProperties, carbonProperties, environment, source)
+
+      // 1. Test "SHOW SEGMENT ON $tableanme WITH STAGE"
+      var rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE").collect()
+      var unloadedStageCount = CarbonStore.listStageFiles(tableStagePath)._1.length
+      assert(rows.length == unloadedStageCount)
+      for (index <- 0 until unloadedStageCount) {
+        assert(rows(index).getString(0) == null)
+        assert(rows(index).getString(1).equals("Unload"))
+        assert(rows(index).getString(2) != null)
+        assert(rows(index).getString(3) == null)
+        assert(!rows(index).getString(4).equals("NA"))
+        assert(rows(index).getString(5) != null)
+        assert(rows(index).getString(6) != null)
+        assert(rows(index).getString(7) == null)
+        assertShowStagesCreateTimeDesc(rows, index)
+      }
+
+      // 2. Test "SHOW SEGMENT FOR TABLE $tableanme"
+      val rowsfortable = sql(s"SHOW SEGMENTS FOR TABLE $tableName WITH STAGE").collect()
+      assert(rowsfortable.length == rows.length)
+      for (index <- 0 until unloadedStageCount) {
+        assert(rows(index).toString() == rowsfortable(index).toString())
+      }
+
+      // 3. Test "SHOW SEGMENT ON $tableanme WITH STAGE AS (QUERY)"
+      rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+        s"(SELECT * FROM $tableName" + "_segments)").collect()
+      for (index <- 0 until unloadedStageCount) {
+        val row = rows(index)
+        assert(rows(index).getString(0) == null)
+        assert(rows(index).getString(1).equals("Unload"))
+        assert(rows(index).getString(2) != null)
+        assert(rows(index).getLong(3) == -1)
+        assert(!rows(index).get(4).toString.equals("WrappedArray(NA)"))
+        assert(rows(index).getLong(5) > 0)
+        assert(rows(index).getLong(6) > 0)
+        assert(rows(index).getString(7) == null)
+        assert(rows(index).getString(8) == null)
+        assert(rows(index).getString(9) == null)
+        assert(rows(index).getString(10) == null)
+        assert(rows(index).getString(11) == null)
+        assertShowStagesCreateTimeDesc(rows, index)
+      }
+
+      // 4. Test "SHOW SEGMENT ON $tableanme WITH STAGE LIMIT 1 AS (QUERY)"
+      //    Test "SHOW SEGMENT ON $tableanme LIMIT 1 AS (QUERY)"
+      if (unloadedStageCount > 1) {
+        sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '1')")
+
+        unloadedStageCount = CarbonStore.listStageFiles(tableStagePath)._1.length
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 1").collect()
+        assert(rows.length == unloadedStageCount + 1)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 0").collect()
+        assert(rows.length == unloadedStageCount)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS FOR TABLE $tableName WITH STAGE LIMIT 1").collect()
+        assert(rows.length == unloadedStageCount + 1)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 1 AS " +
+          s"(SELECT * FROM $tableName" + "_segments)").collect()
+        assert(rows.length == unloadedStageCount + 1)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        unloadedStageCount = CarbonStore.listStageFiles(tableStagePath)._1.length
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+          s"(SELECT * FROM $tableName" + "_segments where status = 'Unload')").collect()
+        assert(rows.length == unloadedStageCount)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+          s"(SELECT * FROM $tableName" + "_segments where status = 'Success')").collect()
+        assert(rows.length >= 1)
+        assert(rows(0).getString(1).equals("Success"))
+
+        // createFakeLoadingStage
+        createFakeLoadingStage(CarbonTablePath.getStageDir(tablePath))
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+          s"(SELECT * FROM $tableName" + "_segments where status = 'Loading')").collect()
+        assert(rows.length == 1)
+        assert(rows(0).getString(1).equals("Loading"))
+
+        var (unloadedFiles, loadingFiles) = CarbonStore.listStageFiles(tableStagePath)
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+          s"(SELECT * FROM $tableName" + "_segments " +
+          "where status = 'Unload' or status = 'Loading')").collect()
+        assert(rows.length == unloadedFiles.length + loadingFiles.length)
+      }
+    }
+  }
+
   test("test concurrent insertstage") {
     createPartitionTable
     try {
@@ -85,11 +189,12 @@
       val source = getTestSource(dataCount)
       executeStreamingEnvironment(tablePath, writerProperties, carbonProperties, environment, source)
 
+      Thread.sleep(5000)
       val executorService = Executors.newFixedThreadPool(10)
       for(i <- 1 to 10) {
         executorService.submit(new Runnable {
           override def run(): Unit = {
-            sql(s"INSERT INTO $tableName STAGE OPTIONS('batch_file_count'='1')")
+            sql(s"INSERT INTO $tableName STAGE OPTIONS('batch_file_count'='5')")
           }
         }).get()
       }
@@ -306,56 +411,24 @@
     properties
   }
 
-  private def collectStageInputs(loadDetailsDir: String): Seq[StageInput] = {
-    val dir = FileFactory.getCarbonFile(loadDetailsDir)
-    val stageFiles = if (dir.exists()) {
-      val allFiles = dir.listFiles()
-      val successFiles = allFiles.filter { file =>
-        file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
-      }.map { file =>
-        (file.getName.substring(0, file.getName.indexOf(".")), file)
-      }.toMap
-      allFiles.filter { file =>
-        !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
-      }.filter { file =>
-        successFiles.contains(file.getName)
-      }.map { file =>
-        (file, successFiles(file.getName))
-      }
-    } else {
-      Array.empty
+  private def assertShowStagesCreateTimeDesc(rows: Array[Row], index: Int): Unit = {
+    if (index > 0) {
+      val nowtime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").
+        parse(rows(index).getString(2)).getTime
+      val lasttime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").
+        parse(rows(index - 1).getString(2)).getTime
+      assert(nowtime <= lasttime)
     }
-
-    val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
-    val gson = new Gson()
-    stageFiles.map { stage =>
-      val filePath = stage._1.getAbsolutePath
-      val stream = FileFactory.getDataInputStream(filePath)
-      try {
-        val stageInput = gson.fromJson(new InputStreamReader(stream), classOf[StageInput])
-        output.add(stageInput)
-      } finally {
-        stream.close()
-      }
-    }
-    output.asScala
   }
 
-  private def delDir(dir: File): Boolean = {
-    if (dir.isDirectory) {
-      val children = dir.list
-      if (children != null) {
-        val length = children.length
-        var i = 0
-        while (i < length) {
-          if (!delDir(new File(dir, children(i)))) {
-              return false
-          }
-          i += 1
-        }
-      }
-    }
-    dir.delete()
+  private def createFakeLoadingStage(stagePath: String): Unit = {
+    var (unloadedFiles, loadingFiles) = CarbonStore.listStageFiles(stagePath)
+    assert(unloadedFiles.length > 0)
+    val loadingFilesCountBefore = loadingFiles.length
+    FileFactory.getCarbonFile(unloadedFiles(0).getAbsolutePath +
+      CarbonTablePath.LOADING_FILE_SUBFIX).createNewFile()
+    loadingFiles = CarbonStore.listStageFiles(stagePath)._2
+    val loadingFilesCountAfter = loadingFiles.length
+    assert(loadingFilesCountAfter == loadingFilesCountBefore + 1)
   }
-
 }
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index c40273d..74c5d94 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -17,25 +17,27 @@
 
 package org.apache.carbon.flink
 
+import java.text.SimpleDateFormat
 import java.util.Properties
 
 import org.apache.flink.api.common.JobExecutionResult
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.execution.exchange.Exchange
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
 
+import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.spark.sql.execution.exchange.Exchange
-import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+
 import org.scalatest.BeforeAndAfterAll
 
 class TestCarbonWriter extends QueryTest with BeforeAndAfterAll{
@@ -241,6 +243,99 @@
     }
   }
 
+  test("Show segments with stage") {
+    createTable
+    try {
+      val tablePath = storeLocation + "/" + tableName + "/"
+      val stagePath = CarbonTablePath.getStageDir(tablePath)
+      val writerProperties = newWriterProperties(dataTempPath)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.enableCheckpointing(2000L)
+      executeFlinkStreamingEnvironment(environment, writerProperties, carbonProperties)
+
+      // 1. Test "SHOW SEGMENT ON $tableanme WITH STAGE"
+      var rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE").collect()
+      var unloadedStageCount = CarbonStore.listStageFiles(stagePath)._1.length
+      assert(rows.length == unloadedStageCount)
+      for (index <- 0 until unloadedStageCount) {
+        assert(rows(index).getString(0) == null)
+        assert(rows(index).getString(1).equals("Unload"))
+        assert(rows(index).getString(2) != null)
+        assert(rows(index).getString(3) == null)
+        assert(rows(index).getString(4).equals("NA"))
+        assert(rows(index).getString(5) != null)
+        assert(rows(index).getString(6) != null)
+        assert(rows(index).getString(7) == null)
+        assertShowStagesCreateTimeDesc(rows, index)
+      }
+
+      // 2. Test "SHOW SEGMENT FOR TABLE $tableanme"
+      val rowsfortable = sql(s"SHOW SEGMENTS FOR TABLE $tableName WITH STAGE").collect()
+      assert(rowsfortable.length == rows.length)
+      for (index <- 0 until unloadedStageCount) {
+        assert(rows(index).toString() == rowsfortable(index).toString())
+      }
+
+      // 3. Test "SHOW SEGMENT ON $tableanme WITH STAGE AS (QUERY)"
+      rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+        s"(SELECT * FROM $tableName" + "_segments)").collect()
+      for (index <- 0 until unloadedStageCount) {
+        val row = rows(index)
+        assert(rows(index).getString(0) == null)
+        assert(rows(index).getString(1).equals("Unload"))
+        assert(rows(index).getString(2) != null)
+        assert(rows(index).getLong(3) == -1)
+        assert(rows(index).get(4).toString.equals("WrappedArray(NA)"))
+        assert(rows(index).getLong(5) > 0)
+        assert(rows(index).getLong(6) > 0)
+        assert(rows(index).getString(7) == null)
+        assert(rows(index).getString(8) == null)
+        assert(rows(index).getString(9) == null)
+        assert(rows(index).getString(10) == null)
+        assert(rows(index).getString(11) == null)
+        assertShowStagesCreateTimeDesc(rows, index)
+      }
+
+      // 4. Test "SHOW SEGMENT ON $tableanme WITH STAGE LIMIT 1 AS (QUERY)"
+      //    Test "SHOW SEGMENT ON $tableanme LIMIT 1 AS (QUERY)"
+
+      if (unloadedStageCount > 1) {
+        sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '1')")
+
+        unloadedStageCount = CarbonStore.listStageFiles(stagePath)._1.length
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 1").collect()
+        assert(rows.length == unloadedStageCount + 1)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 0").collect()
+        assert(rows.length == unloadedStageCount)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS FOR TABLE $tableName WITH STAGE LIMIT 1").collect()
+        assert(rows.length == unloadedStageCount + 1)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE LIMIT 1 AS " +
+          s"(SELECT * FROM $tableName" + "_segments)").collect()
+        assert(rows.length == unloadedStageCount + 1)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+          s"(SELECT * FROM $tableName" + "_segments where status = 'Unload')").collect()
+        unloadedStageCount = CarbonStore.listStageFiles(stagePath)._1.length
+        assert(rows.length == unloadedStageCount)
+        assert(rows(0).getString(1).equals("Unload"))
+
+        rows = sql(s"SHOW SEGMENTS ON $tableName WITH STAGE AS " +
+          s"(SELECT * FROM $tableName" + "_segments where status = 'Success')").collect()
+        assert(rows.length == 1)
+        assert(rows(0).getString(1).equals("Success"))
+      }
+    }
+  }
+
   private def executeFlinkStreamingEnvironment(environment: StreamExecutionEnvironment,
       writerProperties: Properties,
       carbonProperties: Properties): JobExecutionResult = {
@@ -321,4 +416,13 @@
     properties
   }
 
+  private def assertShowStagesCreateTimeDesc(rows: Array[Row], index: Int): Unit = {
+    if (index > 0) {
+      val nowtime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").
+        parse(rows(index).getString(2)).getTime
+      val lasttime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").
+        parse(rows(index - 1).getString(2)).getTime
+      assert(nowtime <= lasttime)
+    }
+  }
 }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 02d36cf..d970e00 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -17,11 +17,16 @@
 
 package org.apache.carbondata.api
 
+import java.io.InputStreamReader
 import java.time.{Duration, Instant}
+import java.util
+import java.util.{Collections, Comparator}
 
 import scala.collection.JavaConverters._
 
+import com.google.gson.Gson
 import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -35,7 +40,7 @@
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatus, SegmentStatusManager, StageInput}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.segment.StreamSegment
 
@@ -45,7 +50,7 @@
   def readSegments(
       tablePath: String,
       showHistory: Boolean,
-      limit: Option[String]): Array[LoadMetadataDetails] = {
+      limit: Option[Int]): Array[LoadMetadataDetails] = {
     val metaFolder = CarbonTablePath.getMetadataPath(tablePath)
     var segmentsMetadataDetails = if (showHistory) {
       SegmentStatusManager.readLoadMetadata(metaFolder) ++
@@ -63,13 +68,91 @@
     }
 
     if (limit.isDefined) {
-      val lim = Integer.parseInt(limit.get)
-      segmentsMetadataDetails.slice(0, lim)
+      segmentsMetadataDetails.slice(0, limit.get)
     } else {
       segmentsMetadataDetails
     }
   }
 
+  /**
+   * Read stage files and return input files
+   */
+  def readStages(tablePath: String): Seq[StageInput] = {
+    val stageFiles = listStageFiles(CarbonTablePath.getStageDir(tablePath))
+    var output = Collections.synchronizedList(new util.ArrayList[StageInput]())
+    output.addAll(readStageInput(stageFiles._1,
+      StageInput.StageStatus.Unload).asJavaCollection)
+    output.addAll(readStageInput(stageFiles._2,
+      StageInput.StageStatus.Loading).asJavaCollection)
+    Collections.sort(output, new Comparator[StageInput]() {
+      def compare(stageInput1: StageInput, stageInput2: StageInput): Int = {
+        (stageInput2.getCreateTime - stageInput1.getCreateTime).intValue()
+      }
+    })
+    output.asScala
+  }
+
+  /**
+   * Read stage files and return input files
+   */
+  def readStageInput(
+      stageFiles: Seq[CarbonFile],
+      status: StageInput.StageStatus): Seq[StageInput] = {
+    val gson = new Gson()
+    val output = Collections.synchronizedList(new util.ArrayList[StageInput]())
+    stageFiles.map { stage =>
+      val filePath = stage.getAbsolutePath
+      val stream = FileFactory.getDataInputStream(filePath)
+      try {
+        val stageInput = gson.fromJson(new InputStreamReader(stream), classOf[StageInput])
+        stageInput.setCreateTime(stage.getLastModifiedTime)
+        stageInput.setStatus(status)
+        output.add(stageInput)
+      } finally {
+        stream.close()
+      }
+    }
+    output.asScala
+  }
+
+  /*
+   * Collect all stage files and matched success files and loading files.
+   * return unloaded stagefiles and loading stagefiles in the end.
+   */
+  def listStageFiles(
+        loadDetailsDir: String): (Array[CarbonFile], Array[CarbonFile]) = {
+    val dir = FileFactory.getCarbonFile(loadDetailsDir)
+    if (dir.exists()) {
+      // 1. List all files in the stage dictionary.
+      val allFiles = dir.listFiles()
+
+      // 2. Get StageFile list.
+      // Firstly, get the stage files in the stage dictionary.
+      //        which exclude the success files and loading files
+      // Second,  only collect the stage files having success tag.
+      val stageFiles = allFiles.filterNot { file =>
+        file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
+      }.filterNot { file =>
+        file.getName.endsWith(CarbonTablePath.LOADING_FILE_SUBFIX)
+      }.filter { file =>
+        allFiles.contains(file.getName + CarbonTablePath.SUCCESS_FILE_SUBFIX)
+      }.sortWith {
+        (file1, file2) => file1.getLastModifiedTime > file2.getLastModifiedTime
+      }
+      // 3. Get the unloaded stage files, which haven't loading tag.
+      val unloadedFiles = stageFiles.filterNot { file =>
+        allFiles.contains(file.getName + CarbonTablePath.LOADING_FILE_SUBFIX)
+      }
+      // 4. Get the loading stage files, which have loading tag.
+      val loadingFiles = stageFiles.filter { file =>
+        allFiles.contains(file.getName + CarbonTablePath.LOADING_FILE_SUBFIX)
+      }
+      (unloadedFiles, loadingFiles)
+    } else {
+      (Array.empty, Array.empty)
+    }
+  }
+
   def getPartitions(tablePath: String, load: LoadMetadataDetails): Seq[String] = {
     val segmentFile = SegmentFileStore.readSegmentFile(
       CarbonTablePath.getSegmentFilePath(tablePath, load.getSegmentFile))
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index cb27db9..e8420ec 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -680,6 +680,8 @@
       val stageFiles = allFiles.filter { file =>
         !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
       }.filter { file =>
+        !file.getName.endsWith(CarbonTablePath.LOADING_FILE_SUBFIX)
+      }.filter { file =>
         successFiles.contains(file.getName)
       }.filterNot { file =>
         loadingFiles.contains(file.getName)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
index 7d1c710..f1e668e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
@@ -22,6 +22,7 @@
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.api.CarbonStore.readSegments
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
@@ -35,8 +36,9 @@
     databaseNameOp: Option[String],
     tableName: String,
     query: String,
-    limit: Option[String],
-    showHistory: Boolean = false)
+    limit: Option[Int],
+    showHistory: Boolean = false,
+    withStage: Boolean = false)
   extends DataCommand {
 
   private lazy val sparkSession = SparkSession.getActiveSession.get
@@ -48,7 +50,7 @@
 
   override def output: Seq[Attribute] = {
     df.queryExecution.analyzed.output.map { attr =>
-      AttributeReference(attr.name, attr.dataType, nullable = false)()
+      AttributeReference(attr.name, attr.dataType, nullable = true)()
     }
   }
 
@@ -72,9 +74,34 @@
 
   private def createDataFrame: DataFrame = {
     val tablePath = carbonTable.getTablePath
-    val segments = CarbonStore.readSegments(tablePath, showHistory, limit)
+    var rows: Seq[SegmentRow] = Seq()
+    if (withStage) {
+      val stageRows = CarbonShowSegmentsCommand.showStages(tablePath)
+      if (stageRows.nonEmpty) {
+        rows = stageRows.map(
+          stageRow =>
+            SegmentRow (
+              stageRow.getString(0),
+              stageRow.getString(1),
+              stageRow.getString(2),
+              -1,
+              Seq(stageRow.getString(4)),
+              stageRow.getString(5).toLong,
+              stageRow.getString(6).toLong,
+              null,
+              stageRow.getString(7),
+              null,
+              null,
+              null
+            )
+        )
+      }
+    }
+
+    val segments = readSegments(tablePath, showHistory, limit)
     val tempViewName = makeTempViewName(carbonTable)
-    registerSegmentRowView(sparkSession, tempViewName, carbonTable, segments)
+    registerSegmentRowView(sparkSession, tempViewName,
+      carbonTable, segments, rows)
     try {
       sparkSession.sql(query)
     } catch {
@@ -95,11 +122,12 @@
       sparkSession: SparkSession,
       tempViewName: String,
       carbonTable: CarbonTable,
-      segments: Array[LoadMetadataDetails]): Unit = {
+      segments: Array[LoadMetadataDetails],
+      rows: Seq[SegmentRow]): Unit = {
 
     // populate a dataframe containing all segment information
     val tablePath = carbonTable.getTablePath
-    val segmentRows = segments.toSeq.map { segment =>
+    val segmentRowView = rows ++ segments.toSeq.map { segment =>
       val mergedToId = CarbonStore.getMergeTo(segment)
       val path = CarbonStore.getExternalSegmentPath(segment)
       val startTime = CarbonStore.getLoadStartTime(segment)
@@ -123,7 +151,7 @@
     }
 
     // create a temp view using the populated dataframe and execute the query on it
-    val df = sparkSession.createDataFrame(segmentRows)
+    val df = sparkSession.createDataFrame(segmentRowView)
     checkIfTableExist(sparkSession, tempViewName)
     df.createOrReplaceTempView(tempViewName)
   }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
index 22d0882..f38bb37 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
@@ -17,34 +17,40 @@
 
 package org.apache.spark.sql.execution.command.management
 
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.types.StringType
 
-import org.apache.carbondata.api.CarbonStore.{getDataAndIndexSize, getLoadStartTime, getLoadTimeTaken, getPartitions, readSegments}
+import org.apache.carbondata.api.CarbonStore.{getDataAndIndexSize, getLoadStartTime, getLoadTimeTaken, getPartitions, readSegments, readStages}
 import org.apache.carbondata.common.Strings
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, StageInput}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
 
 case class CarbonShowSegmentsCommand(
     databaseNameOp: Option[String],
     tableName: String,
-    limit: Option[String],
-    showHistory: Boolean = false)
+    limit: Option[Int],
+    showHistory: Boolean = false,
+    withStage: Boolean = false)
   extends DataCommand {
 
   // add new columns of show segments at last
   override def output: Seq[Attribute] = {
     Seq(
-      AttributeReference("ID", StringType, nullable = false)(),
+      AttributeReference("ID", StringType, nullable = true)(),
       AttributeReference("Status", StringType, nullable = false)(),
       AttributeReference("Load Start Time", StringType, nullable = false)(),
       AttributeReference("Load Time Taken", StringType, nullable = true)(),
       AttributeReference("Partition", StringType, nullable = true)(),
       AttributeReference("Data Size", StringType, nullable = false)(),
       AttributeReference("Index Size", StringType, nullable = false)(),
-      AttributeReference("File Format", StringType, nullable = false)())
+      AttributeReference("File Format", StringType, nullable = true)())
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
@@ -55,12 +61,13 @@
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
     val tablePath = carbonTable.getTablePath
-    val segments = readSegments(tablePath, showHistory, limit)
-    if (segments.nonEmpty) {
-      showBasic(segments, tablePath)
-    } else {
-      Seq.empty
+    var rows: Seq[Row] = Seq()
+    if (withStage) {
+      rows = CarbonShowSegmentsCommand.showStages(tablePath)
     }
+
+    val segments = readSegments(tablePath, showHistory, limit)
+    rows ++ showBasic(segments, tablePath)
   }
 
   override protected def opName: String = "SHOW SEGMENTS"
@@ -93,3 +100,87 @@
       }.toSeq
   }
 }
+
+object CarbonShowSegmentsCommand {
+
+  def showStages(tablePath: String): Seq[Row] = {
+    toRows(readStages(tablePath))
+  }
+
+  private def toRows(stages: Seq[StageInput]): Seq[Row] = {
+    var rows = Seq[Row]()
+    stages.foreach(
+      stage =>
+        rows = rows ++ toRows(stage)
+    )
+    rows
+  }
+
+  private def toRows(stage: StageInput): Seq[Row] = {
+    if (stage.getFiles != null) {
+      // Non-partition stage
+      Seq(
+        Row(
+          null,
+          stage.getStatus.toString,
+          new java.sql.Timestamp(stage.getCreateTime).toString,
+          null,
+          "NA",
+          countDataFileSize(stage.getFiles).toString,
+          countIndexFileSize(stage.getFiles).toString,
+          null)
+      )
+    } else {
+      // Partition stage
+      var partitionString: String = ""
+      var dataFileSize: Long = 0
+      var indexFileSize: Long = 0
+      stage.getLocations.asScala.map{
+        location =>
+          val partitions = location.getPartitions.asScala
+          partitionString = if (partitions.size == 1) {
+            partitionString + partitions.head._1 + "=" + partitions.head._2 + ","
+          } else if (partitions.size > 1) {
+            partitionString + partitions.head._1 + "=" + partitions.head._2 + ", ..."
+          } else {
+            "NA"
+          }
+          dataFileSize += countDataFileSize(location.getFiles)
+          indexFileSize += countIndexFileSize(location.getFiles)
+      }
+      Seq(Row(
+        null,
+        stage.getStatus.toString,
+        new java.sql.Timestamp(stage.getCreateTime).toString,
+        null,
+        partitionString,
+        dataFileSize.toString,
+        indexFileSize.toString,
+        null))
+    }
+  }
+
+  private def countDataFileSize(files: java.util.Map[java.lang.String, java.lang.Long]): Long = {
+    var fileSize: Long = 0
+    import scala.collection.JavaConverters._
+    files.asScala.foreach(
+      file =>
+        if (file._1.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
+          fileSize += file._2
+        }
+    )
+    fileSize
+  }
+
+  private def countIndexFileSize(files: java.util.Map[java.lang.String, java.lang.Long]): Long = {
+    var fileSize: Long = 0
+    files.asScala.foreach(
+      file =>
+        if (file._1.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
+          file._1.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+          fileSize += file._2
+        }
+    )
+    fileSize
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 1353919..b101257 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -541,21 +541,23 @@
    */
   protected lazy val showSegments: Parser[LogicalPlan] =
     (SHOW ~> opt(HISTORY) <~ SEGMENTS <~ ((FOR <~ TABLE) | ON)) ~ (ident <~ ".").? ~ ident ~
-    (LIMIT ~> numericLit).? ~ (AS  ~> restInput).? <~ opt(";") ^^ {
-      case showHistory ~ databaseName ~ tableName ~ limit ~ queryOp =>
+      opt(WITH <~ STAGE) ~ (LIMIT ~> numericLit).? ~ (AS  ~> restInput).? <~ opt(";") ^^ {
+      case showHistory ~ databaseName ~ tableName ~ withStage ~ limit ~ queryOp =>
         if (queryOp.isEmpty) {
           CarbonShowSegmentsCommand(
             CarbonParserUtil.convertDbNameToLowerCase(databaseName),
             tableName.toLowerCase(),
-            limit,
-            showHistory.isDefined)
+            if (limit.isDefined) Some(Integer.valueOf(limit.get)) else None,
+            showHistory.isDefined,
+            withStage.isDefined)
         } else {
           CarbonShowSegmentsAsSelectCommand(
             CarbonParserUtil.convertDbNameToLowerCase(databaseName),
             tableName.toLowerCase(),
             queryOp.get,
-            limit,
-            showHistory.isDefined)
+            if (limit.isDefined) Some(Integer.valueOf(limit.get)) else None,
+            showHistory.isDefined,
+            withStage.isDefined)
         }
     }