[CARBONDATA-3863]after using index service clean the temp data

Why is this PR needed?
each query that use index server will create a folder under
/tmp/indexservertmp. but when query finished the folder will not
be delete. therefore as the number of queries increases, the
folders in /tmp/indexservertmp will increased. then will get
the directory item limit.

What changes were proposed in this PR?
after query finished delete the folder that created.
clean the /tmp/indexservertmp after index server restart.
run a thread that will delete the folder in /tmp/indexservertmp
that has existed for more than 3 hours.

This closes #3855
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8f68bf2..2925e76 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2456,4 +2456,16 @@
    * property which defines the insert stage flow
    */
   public static final String IS_INSERT_STAGE = "is_insert_stage";
+
+  /**
+   * index server temp folder aging period
+   */
+  @CarbonProperty
+  public static final String CARBON_INDEXSERVER_TEMPFOLDER_DELETETIME =
+          "carbon.indexserver.tempfolder.deletetime";
+
+  /**
+   * index server temp folder aging period default value 3hours.
+   */
+  public static final String CARBON_INDEXSERVER_TEMPFOLDER_DELETETIME_DEFAULT = "10800000";
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index f5cb539..9278421 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -25,8 +25,10 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -605,4 +607,18 @@
   public long getLength() throws IOException {
     return fileSystem.getFileStatus(path).getLen();
   }
+
+  @Override
+  public List<CarbonFile> listDirs() throws IOException {
+    FileStatus[] listStatus = null;
+    if (null != fileStatus && fileStatus.isDirectory()) {
+      Path path = fileStatus.getPath();
+      listStatus = fileSystem.listStatus(path);
+      CarbonFile[] dirs = getFiles(listStatus);
+      List<CarbonFile> result = new ArrayList<CarbonFile>(Arrays.asList(dirs));
+      return result.stream().filter(x -> x.isDirectory()).collect(Collectors.toList());
+    } else {
+      return new ArrayList<CarbonFile>();
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index e8e86f0..1b439e1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -39,6 +39,8 @@
 
   List<CarbonFile> listFiles(boolean recursive, CarbonFileFilter fileFilter) throws IOException;
 
+  List<CarbonFile> listDirs() throws IOException;
+
   /**
    * It returns list of files with location details.
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 81b34db..9cc4421 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -484,4 +484,20 @@
   public int hashCode() {
     return Objects.hash(file.getAbsolutePath());
   }
+
+  @Override
+  public List<CarbonFile> listDirs() throws IOException {
+    if (!file.isDirectory()) {
+      return new ArrayList<CarbonFile>();
+    }
+    File[] files = file.listFiles();
+    if (null == files) {
+      return new ArrayList<CarbonFile>();
+    }
+    List<CarbonFile> carbonFiles = new ArrayList<CarbonFile>();
+    for (File value : files) {
+      carbonFiles.add(new LocalCarbonFile(value));
+    }
+    return carbonFiles;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 6d1874b..b2d6e83 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -721,4 +721,14 @@
     FileFactory.createDirectoryAndSetPermission(directory.getCanonicalPath(), permission);
   }
 
+  /**
+   * get the carbon folder list
+   *
+   * @param path folder path
+   * @throws IOException if error occurs
+   */
+  public static List<CarbonFile> getFolderList(String path) throws IOException {
+    return getCarbonFile(path, getConfiguration()).listDirs();
+  }
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 12d96e9..f32c6c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3377,4 +3377,60 @@
     }
     return Integer.parseInt(cacheExpirationTime);
   }
+
+  /**
+   * delete the folder depend on the queryId
+   * @param queryId query id
+   * @throws IOException
+   */
+  public static void deleteTempFolderForIndexServer(String queryId)
+          throws IOException {
+    final String path = getIndexServerTempPath();
+    if (queryId == null) {
+      return;
+    }
+    String pathName = path + CarbonCommonConstants.FILE_SEPARATOR + queryId;
+    String indexServerTmpDirPath = CarbonUtil
+            .checkAndAppendFileSystemURIScheme(pathName);
+    if (!FileFactory
+            .deleteFile(indexServerTmpDirPath)) {
+      LOGGER.info("Unable to delete directory: " + pathName);
+    } else {
+      LOGGER.info("Successfully delete directory: " + pathName);
+    }
+  }
+
+  /**
+   * use to clean the tmp folder, avoid exceeding the limit on the number of files.
+   */
+  public static void cleanTempFolderForIndexServer()throws IOException  {
+    final String folderPath = getIndexServerTempPath();
+    String indexServerTmpDirPath = CarbonUtil
+            .checkAndAppendFileSystemURIScheme(folderPath);
+    if (FileFactory.deleteFile(indexServerTmpDirPath)) {
+      LOGGER.info("Complete " + indexServerTmpDirPath + " file cleanup.");
+    } else {
+      LOGGER.info("Failed " + indexServerTmpDirPath + " file cleanup.");
+    }
+  }
+
+  /**
+   * use to clean the tmp folder, avoid exceeding the limit on the number of files.
+   */
+  public static void agingTempFolderForIndexServer(long agingTime)throws
+          IOException, InterruptedException {
+    final String folderPath = getIndexServerTempPath();
+    if (FileFactory.isFileExist(folderPath)) {
+      List<CarbonFile> carbonFileList = FileFactory.getFolderList(folderPath);
+      carbonFileList.stream().filter(carbonFile -> carbonFile.getLastModifiedTime() < agingTime
+      ).forEach(delFile -> {
+        try {
+          deleteFoldersAndFiles(delFile);
+          LOGGER.info("delete file + " + delFile.getPath());
+        } catch (IOException | InterruptedException e) {
+          LOGGER.error("aging temp folder for index server failed.");
+        }
+      });
+    }
+  }
 }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
index 8d927ea..6957a60 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
@@ -69,6 +69,12 @@
         if (null != splitFolderPath && !splitFolderPath.deleteFile()) {
           LOGGER.error("Problem while deleting the temp directory:"
             + splitFolderPath.getAbsolutePath)
+        } else {
+          // if the path build with getQueryId already exists,
+          // the splitFolderPath should be null, need delete
+          if (null == splitFolderPath) {
+            CarbonUtil.deleteTempFolderForIndexServer(indexFormat.getQueryId)
+          }
         }
       }
     }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 4eab3e4..9798e80 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -19,7 +19,7 @@
 import java.net.InetSocketAddress
 import java.security.PrivilegedAction
 import java.util.UUID
-import java.util.concurrent.{Executors, ExecutorService}
+import java.util.concurrent.{Executors, ExecutorService, ScheduledExecutorService, TimeUnit}
 
 import scala.collection.JavaConverters._
 
@@ -39,7 +39,7 @@
 import org.apache.carbondata.core.index.IndexInputFormat
 import org.apache.carbondata.core.indexstore.{ExtendedBlockletWrapperContainer, SegmentWrapperContainer}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.events.{IndexServerEvent, OperationContext, OperationListenerBus}
 
 @ProtocolInfo(protocolName = "org.apache.carbondata.indexserver.ServerInterface",
@@ -102,6 +102,10 @@
       .getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE) != null
   private val operationContext: OperationContext = new OperationContext
 
+  private val agePeriod: String = CarbonProperties.getInstance
+    .getProperty(CarbonCommonConstants.CARBON_INDEXSERVER_TEMPFOLDER_DELETETIME,
+      CarbonCommonConstants.CARBON_INDEXSERVER_TEMPFOLDER_DELETETIME_DEFAULT)
+
   /**
    * Perform the operation 'f' on behalf of the login user.
    */
@@ -269,6 +273,10 @@
         .CARBON_ENABLE_INDEX_SERVER, "true")
       CarbonProperties.getInstance().addNonSerializableProperty(CarbonCommonConstants
         .IS_DRIVER_INSTANCE, "true")
+      // when restart index service clean the tmp folder
+      CarbonUtil.cleanTempFolderForIndexServer()
+      // create a thread to aging the temp folder
+      indexTempFolderCleanUpScheduleThread()
       LOGGER.info(s"Index cache server running on ${ server.getPort } port")
     }
   }
@@ -316,4 +324,17 @@
       Array(new Service("security.indexserver.protocol.acl", classOf[ServerInterface]))
     }
   }
+
+  def indexTempFolderCleanUpScheduleThread(): Unit = {
+    val runnable = new Runnable() {
+      def run() {
+        val age = System.currentTimeMillis() - agePeriod.toLong
+        CarbonUtil.agingTempFolderForIndexServer(age)
+        LOGGER.info(s"Complete age temp folder ${CarbonUtil.getIndexServerTempPath}")
+      }
+    }
+    val ags: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor
+    ags.scheduleAtFixedRate(runnable, 1000, 3600000, TimeUnit.MILLISECONDS)
+    LOGGER.info("index server temp folders aging thread start")
+  }
 }
diff --git a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
index 35275f5..8f4da6a 100644
--- a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
+++ b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
@@ -1,15 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.indexserver
 
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
-
+import mockit.{Mock, MockUp}
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.scalatest.{BeforeAndAfterEach, FunSuite}
-
-import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.index.{IndexInputFormat, Segment}
 import org.apache.carbondata.core.index.dev.expr.IndexInputSplitWrapper
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexInputSplit
-import org.apache.carbondata.indexserver.DistributedRDDUtils
+import org.apache.carbondata.indexserver.{DistributedIndexJob, DistributedRDDUtils}
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
 
 class DistributedRDDUtilsTest extends FunSuite with BeforeAndAfterEach {
 
@@ -18,9 +36,12 @@
 
   val tableCache: ConcurrentHashMap[String, ConcurrentHashMap[String, String]] = DistributedRDDUtils.tableToExecutorMapping
 
+  val indexServerTempFolder = "file:////tmp/indexservertmp/"
+
   override protected def beforeEach(): Unit = {
     executorCache.clear()
     tableCache.clear()
+    FileFactory.deleteFile(indexServerTempFolder)
     buildTestData
   }
 
@@ -112,4 +133,55 @@
       a => a._2.values().asScala.foreach(size => assert(size > 27500 && size < 28000))
     }
   }
+
+  test("Test file create and delete when query") {
+    val distributedRDDUtilsTest = new DistributedIndexJob()
+
+    val mockDataMapFormat = new MockUp[IndexInputFormat]() {
+      @Mock
+      def getQueryId: String = {
+        "a885a111-439f-4b91-ad81-f0bd48164b84"
+      }
+    }
+    try{
+      distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance)
+    } catch {
+      case ex: Exception =>
+    }
+    val tmpPath = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84"
+    assert(!FileFactory.isFileExist(tmpPath))
+    assert(FileFactory.isFileExist(indexServerTempFolder))
+  }
+
+  test("Test file create and delete when query the getQueryId path is exists") {
+    val distributedRDDUtilsTest = new DistributedIndexJob()
+    val tmpPath = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84"
+    val newPath = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84/ip1"
+    val newFile = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84/ip1/as1"
+    val tmpPathAnother = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b8412"
+    FileFactory.createDirectoryAndSetPermission(tmpPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+    FileFactory.createDirectoryAndSetPermission(newPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+    FileFactory.createNewFile(newFile, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+    FileFactory.createDirectoryAndSetPermission(tmpPathAnother, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+
+    assert(FileFactory.isFileExist(newFile))
+    assert(FileFactory.isFileExist(tmpPath))
+    assert(FileFactory.isFileExist(newPath))
+    assert(FileFactory.isFileExist(tmpPathAnother))
+
+    val mockDataMapFormat = new MockUp[IndexInputFormat]() {
+      @Mock
+      def getQueryId: String = {
+        "a885a111-439f-4b91-ad81-f0bd48164b84"
+      }
+    }
+    try{
+      distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance)
+    } catch {
+      case ex: Exception =>
+    }
+    assert(!FileFactory.isFileExist(tmpPath))
+    assert(FileFactory.isFileExist(indexServerTempFolder))
+    assert(FileFactory.isFileExist(tmpPathAnother))
+  }
 }
diff --git a/integration/spark/src/test/scala/org/apache/indexserver/IndexServerTest.scala b/integration/spark/src/test/scala/org/apache/indexserver/IndexServerTest.scala
new file mode 100644
index 0000000..0b1e346
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/indexserver/IndexServerTest.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.indexserver
+
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
+
+
+class IndexServerTest extends FunSuite with BeforeAndAfterEach {
+  val folderPath = CarbonUtil.getIndexServerTempPath
+
+  override protected def beforeEach(): Unit = {
+    if (FileFactory.isFileExist(folderPath)) {
+      FileFactory.deleteFile(folderPath)
+    }
+    FileFactory.createDirectoryAndSetPermission(folderPath,
+      new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+  }
+  override protected def afterEach(): Unit = {
+    if (FileFactory.isFileExist(folderPath)) {
+      FileFactory.deleteFile(folderPath)
+    }
+  }
+
+  test("test clean tmp folder when restart") {
+    val folderPath = CarbonUtil.getIndexServerTempPath
+    assert(FileFactory.isFileExist(folderPath))
+    CarbonUtil.cleanTempFolderForIndexServer()
+    assert(!FileFactory.isFileExist(folderPath))
+  }
+
+  test("test age tmp folder as some period") {
+    val folderPath = CarbonUtil.getIndexServerTempPath
+    val tmpPath = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84"
+    val newPath = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84/ip1"
+    val newFile = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84/ip1/as1"
+    val tmpPathII = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b8411"
+    val tmpPathIII = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84121"
+    val tmpPathV = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84121V"
+    FileFactory.createDirectoryAndSetPermission(tmpPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+    FileFactory.createDirectoryAndSetPermission(tmpPathII, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+    FileFactory.createDirectoryAndSetPermission(tmpPathIII, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+    FileFactory.createDirectoryAndSetPermission(newPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+    FileFactory.createNewFile(newFile)
+    Thread.sleep(5000)
+    val age = System.currentTimeMillis() - 3000
+    FileFactory.createDirectoryAndSetPermission(tmpPathV, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+    CarbonUtil.agingTempFolderForIndexServer(age)
+    assert(!FileFactory.isFileExist(tmpPath))
+    System.out.println(folderPath)
+    assert(FileFactory.isFileExist(folderPath))
+    assert(FileFactory.isFileExist(tmpPathV))
+  }
+
+
+}