[CARBONDATA-3856] Support the LIMIT operator for show segments command

Why is this PR needed?
When the number segments is large, "SHOW SEGMENTS" time overhead is too high. The LIMIT operator shall be supported in the SHOW SEGMENTS command.

What changes were proposed in this PR?
Add LIMIT operator in the grammar parsing and read segments function.

Does this PR introduce any user interface change?
YES

Is any new testcase added?
YES

This closes #3792
diff --git a/docs/segment-management-on-carbondata.md b/docs/segment-management-on-carbondata.md
index d4fe339..3ef0a3a 100644
--- a/docs/segment-management-on-carbondata.md
+++ b/docs/segment-management-on-carbondata.md
@@ -32,7 +32,7 @@
 
   ```
   SHOW [HISTORY] SEGMENTS
-  [FOR TABLE | ON] [db_name.]table_name
+  [FOR TABLE | ON] [db_name.]table_name [LIMIT number_of_segments]
   [AS (select query from table_name_segments)]
   ```
 
@@ -54,6 +54,12 @@
   SHOW SEGMENTS ON CarbonDatabase.CarbonTable
   ```
 
+  Show 10 visible segments with the largest segmentid
+
+  ```
+  SHOW SEGMENTS ON CarbonDatabase.CarbonTable LIMIT 10
+  ```
+
   Show all segments, include invisible segments
   ```
   SHOW HISTORY SEGMENTS ON CarbonDatabase.CarbonTable
@@ -83,6 +89,9 @@
   SHOW SEGMENTS ON CarbonTable AS 
   SELECT * FROM CarbonTable_segments
   
+  SHOW SEGMENTS ON CarbonTable LIMIT 10 AS 
+  SELECT * FROM CarbonTable_segments
+  
   SHOW SEGMENTS ON CarbonTable AS
   SELECT id, dataSize FROM CarbonTable_segments 
   WHERE status='Success' 
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index d86f7be..02d36cf 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -42,16 +42,29 @@
 object CarbonStore {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  def readSegments(tablePath: String, showHistory: Boolean): Array[LoadMetadataDetails] = {
+  def readSegments(
+      tablePath: String,
+      showHistory: Boolean,
+      limit: Option[String]): Array[LoadMetadataDetails] = {
     val metaFolder = CarbonTablePath.getMetadataPath(tablePath)
-    val segmentsMetadataDetails = if (showHistory) {
+    var segmentsMetadataDetails = if (showHistory) {
       SegmentStatusManager.readLoadMetadata(metaFolder) ++
       SegmentStatusManager.readLoadHistoryMetadata(metaFolder)
     } else {
       SegmentStatusManager.readLoadMetadata(metaFolder)
     }
     if (!showHistory) {
-      segmentsMetadataDetails.filter(_.getVisibility.equalsIgnoreCase("true"))
+      segmentsMetadataDetails = segmentsMetadataDetails
+        .filter(_.getVisibility.equalsIgnoreCase("true"))
+      segmentsMetadataDetails = segmentsMetadataDetails.sortWith { (l1, l2) =>
+        java.lang.Double.parseDouble(l1.getLoadName) >
+        java.lang.Double.parseDouble(l2.getLoadName)
+      }
+    }
+
+    if (limit.isDefined) {
+      val lim = Integer.parseInt(limit.get)
+      segmentsMetadataDetails.slice(0, lim)
     } else {
       segmentsMetadataDetails
     }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
index 26772dd..7d1c710 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
@@ -35,6 +35,7 @@
     databaseNameOp: Option[String],
     tableName: String,
     query: String,
+    limit: Option[String],
     showHistory: Boolean = false)
   extends DataCommand {
 
@@ -71,7 +72,7 @@
 
   private def createDataFrame: DataFrame = {
     val tablePath = carbonTable.getTablePath
-    val segments = CarbonStore.readSegments(tablePath, showHistory)
+    val segments = CarbonStore.readSegments(tablePath, showHistory, limit)
     val tempViewName = makeTempViewName(carbonTable)
     registerSegmentRowView(sparkSession, tempViewName, carbonTable, segments)
     try {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
index 53ea021..22d0882 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
@@ -30,6 +30,7 @@
 case class CarbonShowSegmentsCommand(
     databaseNameOp: Option[String],
     tableName: String,
+    limit: Option[String],
     showHistory: Boolean = false)
   extends DataCommand {
 
@@ -54,7 +55,7 @@
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
     val tablePath = carbonTable.getTablePath
-    val segments = readSegments(tablePath, showHistory)
+    val segments = readSegments(tablePath, showHistory, limit)
     if (segments.nonEmpty) {
       showBasic(segments, tablePath)
     } else {
@@ -65,13 +66,8 @@
   override protected def opName: String = "SHOW SEGMENTS"
 
   private def showBasic(
-      allSegments: Array[LoadMetadataDetails],
+      segments: Array[LoadMetadataDetails],
       tablePath: String): Seq[Row] = {
-    val segments = allSegments.sortWith { (l1, l2) =>
-      java.lang.Double.parseDouble(l1.getLoadName) >
-      java.lang.Double.parseDouble(l2.getLoadName)
-    }
-
     segments
       .map { segment =>
         val startTime = getLoadStartTime(segment)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 429aac3..1353919 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -541,18 +541,20 @@
    */
   protected lazy val showSegments: Parser[LogicalPlan] =
     (SHOW ~> opt(HISTORY) <~ SEGMENTS <~ ((FOR <~ TABLE) | ON)) ~ (ident <~ ".").? ~ ident ~
-    (AS  ~> restInput).? <~ opt(";") ^^ {
-      case showHistory ~ databaseName ~ tableName ~ queryOp =>
+    (LIMIT ~> numericLit).? ~ (AS  ~> restInput).? <~ opt(";") ^^ {
+      case showHistory ~ databaseName ~ tableName ~ limit ~ queryOp =>
         if (queryOp.isEmpty) {
           CarbonShowSegmentsCommand(
             CarbonParserUtil.convertDbNameToLowerCase(databaseName),
             tableName.toLowerCase(),
+            limit,
             showHistory.isDefined)
         } else {
           CarbonShowSegmentsAsSelectCommand(
             CarbonParserUtil.convertDbNameToLowerCase(databaseName),
             tableName.toLowerCase(),
             queryOp.get,
+            limit,
             showHistory.isDefined)
         }
     }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
index 0fbd39c..27de94f 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
@@ -69,11 +69,22 @@
         | select id, status, datasize from source_segments where status = 'Success' order by dataSize
         |""".stripMargin).collect()
 
+
     assertResult("4.1")(rows(0).get(0))
     assertResult("Success")(rows(0).get(1))
     assertResult("0.2")(rows(1).get(0))
     assertResult("Success")(rows(1).get(1))
 
+    rows = sql(
+      """
+        | show segments on source limit 2 as
+        | select id, status, datasize from source_segments where status = 'Success' order by dataSize
+        |""".stripMargin).collect()
+
+    assertResult("4.1")(rows(0).get(0))
+    assertResult("Success")(rows(0).get(1))
+    assertResult(1)(rows.length)
+
     val tables = sql("show tables").collect()
     assert(!tables.toSeq.exists(_.get(1).equals("source_segments")))
 
@@ -84,7 +95,9 @@
     sql(s"""drop TABLE if exists source""").collect
     sql(s"""CREATE TABLE source (CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),Double_COLUMN1 double,DECIMAL_COLUMN2 decimal(36,10), Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED AS carbondata TBLPROPERTIES('table_blocksize'='1')""").collect
     checkAnswer(sql("show segments on source"), Seq.empty)
-    val result = sql("show segments on source as select * from source_segments").collect()
+    var result = sql("show segments on source as select * from source_segments").collect()
+    assertResult(0)(result.length)
+    result = sql("show segments on source limit 10 as select * from source_segments").collect()
     assertResult(0)(result.length)
   }
 
@@ -144,6 +157,7 @@
     assert(historyDetail.length == 0)
     sql(s"clean files for table ${tableName}")
     assert(sql(s"show segments on ${tableName}").collect().length == 2)
+    assert(sql(s"show segments on ${tableName} limit 1").collect().length == 1)
     detail = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
     historyDetail = SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath)
     assert(detail.length == 4)
@@ -163,7 +177,7 @@
     sql(s"clean files for table ${tableName}")
     assert(sql(s"show segments on ${tableName} as select * from ${tableName}_segments").collect().length == 2)
     sql(s"show history segments on ${tableName} as select * from ${tableName}_segments").show(false)
-    val segmentsHistoryList = sql(s"show history segments on ${tableName} as select * from ${tableName}_segments").collect()
+    var segmentsHistoryList = sql(s"show history segments on ${tableName} as select * from ${tableName}_segments").collect()
     assert(segmentsHistoryList.length == 10)
     assertResult("0")(segmentsHistoryList(0).getString(0))
     assertResult("Compacted")(segmentsHistoryList(0).getString(1))
@@ -183,6 +197,15 @@
     assertResult("Compacted")(segmentsHistoryList(8).getString(1))
     assertResult("4")(segmentsHistoryList(9).getString(0))
     assertResult("Compacted")(segmentsHistoryList(9).getString(1))
+
+    segmentsHistoryList = sql(s"show history segments on ${tableName} limit 2 as select * from ${tableName}_segments").collect()
+    assert(segmentsHistoryList.length == 2)
+    assertResult("0")(segmentsHistoryList(0).getString(0))
+    assertResult("Compacted")(segmentsHistoryList(0).getString(1))
+    assertResult("0.1")(segmentsHistoryList(0).getString(7))
+    assertResult("0.2")(segmentsHistoryList(1).getString(0))
+    assertResult("Success")(segmentsHistoryList(1).getString(1))
+
     assert(sql(s"show history segments on ${tableName} as select * from ${tableName}_segments limit 3").collect().length == 3)
     dropTable(tableName)
   }
@@ -191,7 +214,11 @@
     sql("drop table if exists a")
     sql("create table a(a string) stored as carbondata")
     sql("insert into a select 'k'")
+    sql("insert into a select 'j'")
+    sql("insert into a select 'k'")
     val rows = sql("show segments for table a").collect()
+    assert(rows.length == 3)
+    assert(sql(s"show segments for table a limit 1").collect().length == 1)
     assert(rows(0).getString(3).replace("S", "").toDouble > 0)
     assert(rows(0).getString(7).equalsIgnoreCase("columnar_v3"))
     sql("drop table if exists a")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
index 8707ec7..4de0761 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
@@ -254,10 +254,12 @@
       val col = df.collect().map{
         row => Row(row.getString(0),row.getString(1),row.getString(7))
       }.toSeq
-      assert(col.equals(Seq(Row("0","Compacted","0.1"),
+      assert(col.equals(Seq(
+        Row("2","Success","NA"),
         Row("1","Compacted","0.1"),
         Row("0.1","Success","NA"),
-        Row("2","Success","NA"))))
+        Row("0","Compacted","0.1")
+       )))
     }
     finally {
       sql("SET carbon.input.segments.default.carbon_table=*")