[CARBONDATA-3863]after using index service clean the temp data
Why is this PR needed?
each query that use index server will create a folder under
/tmp/indexservertmp. but when query finished the folder will not
be delete. therefore as the number of queries increases, the
folders in /tmp/indexservertmp will increased. then will get
the directory item limit.
What changes were proposed in this PR?
after query finished delete the folder that created.
clean the /tmp/indexservertmp after index server restart.
run a thread that will delete the folder in /tmp/indexservertmp
that has existed for more than 3 hours.
This closes #3855
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8f68bf2..2925e76 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2456,4 +2456,16 @@
* property which defines the insert stage flow
*/
public static final String IS_INSERT_STAGE = "is_insert_stage";
+
+ /**
+ * index server temp folder aging period
+ */
+ @CarbonProperty
+ public static final String CARBON_INDEXSERVER_TEMPFOLDER_DELETETIME =
+ "carbon.indexserver.tempfolder.deletetime";
+
+ /**
+ * index server temp folder aging period default value 3hours.
+ */
+ public static final String CARBON_INDEXSERVER_TEMPFOLDER_DELETETIME_DEFAULT = "10800000";
}
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index f5cb539..9278421 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -25,8 +25,10 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -605,4 +607,18 @@
public long getLength() throws IOException {
return fileSystem.getFileStatus(path).getLen();
}
+
+ @Override
+ public List<CarbonFile> listDirs() throws IOException {
+ FileStatus[] listStatus = null;
+ if (null != fileStatus && fileStatus.isDirectory()) {
+ Path path = fileStatus.getPath();
+ listStatus = fileSystem.listStatus(path);
+ CarbonFile[] dirs = getFiles(listStatus);
+ List<CarbonFile> result = new ArrayList<CarbonFile>(Arrays.asList(dirs));
+ return result.stream().filter(x -> x.isDirectory()).collect(Collectors.toList());
+ } else {
+ return new ArrayList<CarbonFile>();
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index e8e86f0..1b439e1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -39,6 +39,8 @@
List<CarbonFile> listFiles(boolean recursive, CarbonFileFilter fileFilter) throws IOException;
+ List<CarbonFile> listDirs() throws IOException;
+
/**
* It returns list of files with location details.
*/
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 81b34db..9cc4421 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -484,4 +484,20 @@
public int hashCode() {
return Objects.hash(file.getAbsolutePath());
}
+
+ @Override
+ public List<CarbonFile> listDirs() throws IOException {
+ if (!file.isDirectory()) {
+ return new ArrayList<CarbonFile>();
+ }
+ File[] files = file.listFiles();
+ if (null == files) {
+ return new ArrayList<CarbonFile>();
+ }
+ List<CarbonFile> carbonFiles = new ArrayList<CarbonFile>();
+ for (File value : files) {
+ carbonFiles.add(new LocalCarbonFile(value));
+ }
+ return carbonFiles;
+ }
}
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 6d1874b..b2d6e83 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -721,4 +721,14 @@
FileFactory.createDirectoryAndSetPermission(directory.getCanonicalPath(), permission);
}
+ /**
+ * get the carbon folder list
+ *
+ * @param path folder path
+ * @throws IOException if error occurs
+ */
+ public static List<CarbonFile> getFolderList(String path) throws IOException {
+ return getCarbonFile(path, getConfiguration()).listDirs();
+ }
+
}
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 12d96e9..f32c6c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3377,4 +3377,60 @@
}
return Integer.parseInt(cacheExpirationTime);
}
+
+ /**
+ * delete the folder depend on the queryId
+ * @param queryId query id
+ * @throws IOException
+ */
+ public static void deleteTempFolderForIndexServer(String queryId)
+ throws IOException {
+ final String path = getIndexServerTempPath();
+ if (queryId == null) {
+ return;
+ }
+ String pathName = path + CarbonCommonConstants.FILE_SEPARATOR + queryId;
+ String indexServerTmpDirPath = CarbonUtil
+ .checkAndAppendFileSystemURIScheme(pathName);
+ if (!FileFactory
+ .deleteFile(indexServerTmpDirPath)) {
+ LOGGER.info("Unable to delete directory: " + pathName);
+ } else {
+ LOGGER.info("Successfully delete directory: " + pathName);
+ }
+ }
+
+ /**
+ * use to clean the tmp folder, avoid exceeding the limit on the number of files.
+ */
+ public static void cleanTempFolderForIndexServer()throws IOException {
+ final String folderPath = getIndexServerTempPath();
+ String indexServerTmpDirPath = CarbonUtil
+ .checkAndAppendFileSystemURIScheme(folderPath);
+ if (FileFactory.deleteFile(indexServerTmpDirPath)) {
+ LOGGER.info("Complete " + indexServerTmpDirPath + " file cleanup.");
+ } else {
+ LOGGER.info("Failed " + indexServerTmpDirPath + " file cleanup.");
+ }
+ }
+
+ /**
+ * use to clean the tmp folder, avoid exceeding the limit on the number of files.
+ */
+ public static void agingTempFolderForIndexServer(long agingTime)throws
+ IOException, InterruptedException {
+ final String folderPath = getIndexServerTempPath();
+ if (FileFactory.isFileExist(folderPath)) {
+ List<CarbonFile> carbonFileList = FileFactory.getFolderList(folderPath);
+ carbonFileList.stream().filter(carbonFile -> carbonFile.getLastModifiedTime() < agingTime
+ ).forEach(delFile -> {
+ try {
+ deleteFoldersAndFiles(delFile);
+ LOGGER.info("delete file + " + delFile.getPath());
+ } catch (IOException | InterruptedException e) {
+ LOGGER.error("aging temp folder for index server failed.");
+ }
+ });
+ }
+ }
}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
index 8d927ea..6957a60 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
@@ -69,6 +69,12 @@
if (null != splitFolderPath && !splitFolderPath.deleteFile()) {
LOGGER.error("Problem while deleting the temp directory:"
+ splitFolderPath.getAbsolutePath)
+ } else {
+ // if the path build with getQueryId already exists,
+ // the splitFolderPath should be null, need delete
+ if (null == splitFolderPath) {
+ CarbonUtil.deleteTempFolderForIndexServer(indexFormat.getQueryId)
+ }
}
}
}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 4eab3e4..9798e80 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -19,7 +19,7 @@
import java.net.InetSocketAddress
import java.security.PrivilegedAction
import java.util.UUID
-import java.util.concurrent.{Executors, ExecutorService}
+import java.util.concurrent.{Executors, ExecutorService, ScheduledExecutorService, TimeUnit}
import scala.collection.JavaConverters._
@@ -39,7 +39,7 @@
import org.apache.carbondata.core.index.IndexInputFormat
import org.apache.carbondata.core.indexstore.{ExtendedBlockletWrapperContainer, SegmentWrapperContainer}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.events.{IndexServerEvent, OperationContext, OperationListenerBus}
@ProtocolInfo(protocolName = "org.apache.carbondata.indexserver.ServerInterface",
@@ -102,6 +102,10 @@
.getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE) != null
private val operationContext: OperationContext = new OperationContext
+ private val agePeriod: String = CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.CARBON_INDEXSERVER_TEMPFOLDER_DELETETIME,
+ CarbonCommonConstants.CARBON_INDEXSERVER_TEMPFOLDER_DELETETIME_DEFAULT)
+
/**
* Perform the operation 'f' on behalf of the login user.
*/
@@ -269,6 +273,10 @@
.CARBON_ENABLE_INDEX_SERVER, "true")
CarbonProperties.getInstance().addNonSerializableProperty(CarbonCommonConstants
.IS_DRIVER_INSTANCE, "true")
+ // when restart index service clean the tmp folder
+ CarbonUtil.cleanTempFolderForIndexServer()
+ // create a thread to aging the temp folder
+ indexTempFolderCleanUpScheduleThread()
LOGGER.info(s"Index cache server running on ${ server.getPort } port")
}
}
@@ -316,4 +324,17 @@
Array(new Service("security.indexserver.protocol.acl", classOf[ServerInterface]))
}
}
+
+ def indexTempFolderCleanUpScheduleThread(): Unit = {
+ val runnable = new Runnable() {
+ def run() {
+ val age = System.currentTimeMillis() - agePeriod.toLong
+ CarbonUtil.agingTempFolderForIndexServer(age)
+ LOGGER.info(s"Complete age temp folder ${CarbonUtil.getIndexServerTempPath}")
+ }
+ }
+ val ags: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor
+ ags.scheduleAtFixedRate(runnable, 1000, 3600000, TimeUnit.MILLISECONDS)
+ LOGGER.info("index server temp folders aging thread start")
+ }
}
diff --git a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
index 35275f5..8f4da6a 100644
--- a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
+++ b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
@@ -1,15 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.indexserver
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
-
+import mockit.{Mock, MockUp}
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.scalatest.{BeforeAndAfterEach, FunSuite}
-
-import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.index.{IndexInputFormat, Segment}
import org.apache.carbondata.core.index.dev.expr.IndexInputSplitWrapper
import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexInputSplit
-import org.apache.carbondata.indexserver.DistributedRDDUtils
+import org.apache.carbondata.indexserver.{DistributedIndexJob, DistributedRDDUtils}
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
class DistributedRDDUtilsTest extends FunSuite with BeforeAndAfterEach {
@@ -18,9 +36,12 @@
val tableCache: ConcurrentHashMap[String, ConcurrentHashMap[String, String]] = DistributedRDDUtils.tableToExecutorMapping
+ val indexServerTempFolder = "file:////tmp/indexservertmp/"
+
override protected def beforeEach(): Unit = {
executorCache.clear()
tableCache.clear()
+ FileFactory.deleteFile(indexServerTempFolder)
buildTestData
}
@@ -112,4 +133,55 @@
a => a._2.values().asScala.foreach(size => assert(size > 27500 && size < 28000))
}
}
+
+ test("Test file create and delete when query") {
+ val distributedRDDUtilsTest = new DistributedIndexJob()
+
+ val mockDataMapFormat = new MockUp[IndexInputFormat]() {
+ @Mock
+ def getQueryId: String = {
+ "a885a111-439f-4b91-ad81-f0bd48164b84"
+ }
+ }
+ try{
+ distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance)
+ } catch {
+ case ex: Exception =>
+ }
+ val tmpPath = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84"
+ assert(!FileFactory.isFileExist(tmpPath))
+ assert(FileFactory.isFileExist(indexServerTempFolder))
+ }
+
+ test("Test file create and delete when query the getQueryId path is exists") {
+ val distributedRDDUtilsTest = new DistributedIndexJob()
+ val tmpPath = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84"
+ val newPath = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84/ip1"
+ val newFile = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84/ip1/as1"
+ val tmpPathAnother = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b8412"
+ FileFactory.createDirectoryAndSetPermission(tmpPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ FileFactory.createDirectoryAndSetPermission(newPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ FileFactory.createNewFile(newFile, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ FileFactory.createDirectoryAndSetPermission(tmpPathAnother, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+
+ assert(FileFactory.isFileExist(newFile))
+ assert(FileFactory.isFileExist(tmpPath))
+ assert(FileFactory.isFileExist(newPath))
+ assert(FileFactory.isFileExist(tmpPathAnother))
+
+ val mockDataMapFormat = new MockUp[IndexInputFormat]() {
+ @Mock
+ def getQueryId: String = {
+ "a885a111-439f-4b91-ad81-f0bd48164b84"
+ }
+ }
+ try{
+ distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance)
+ } catch {
+ case ex: Exception =>
+ }
+ assert(!FileFactory.isFileExist(tmpPath))
+ assert(FileFactory.isFileExist(indexServerTempFolder))
+ assert(FileFactory.isFileExist(tmpPathAnother))
+ }
}
diff --git a/integration/spark/src/test/scala/org/apache/indexserver/IndexServerTest.scala b/integration/spark/src/test/scala/org/apache/indexserver/IndexServerTest.scala
new file mode 100644
index 0000000..0b1e346
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/indexserver/IndexServerTest.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.indexserver
+
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
+
+
+class IndexServerTest extends FunSuite with BeforeAndAfterEach {
+ val folderPath = CarbonUtil.getIndexServerTempPath
+
+ override protected def beforeEach(): Unit = {
+ if (FileFactory.isFileExist(folderPath)) {
+ FileFactory.deleteFile(folderPath)
+ }
+ FileFactory.createDirectoryAndSetPermission(folderPath,
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ }
+ override protected def afterEach(): Unit = {
+ if (FileFactory.isFileExist(folderPath)) {
+ FileFactory.deleteFile(folderPath)
+ }
+ }
+
+ test("test clean tmp folder when restart") {
+ val folderPath = CarbonUtil.getIndexServerTempPath
+ assert(FileFactory.isFileExist(folderPath))
+ CarbonUtil.cleanTempFolderForIndexServer()
+ assert(!FileFactory.isFileExist(folderPath))
+ }
+
+ test("test age tmp folder as some period") {
+ val folderPath = CarbonUtil.getIndexServerTempPath
+ val tmpPath = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84"
+ val newPath = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84/ip1"
+ val newFile = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84/ip1/as1"
+ val tmpPathII = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b8411"
+ val tmpPathIII = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84121"
+ val tmpPathV = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84121V"
+ FileFactory.createDirectoryAndSetPermission(tmpPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ FileFactory.createDirectoryAndSetPermission(tmpPathII, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ FileFactory.createDirectoryAndSetPermission(tmpPathIII, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ FileFactory.createDirectoryAndSetPermission(newPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ FileFactory.createNewFile(newFile)
+ Thread.sleep(5000)
+ val age = System.currentTimeMillis() - 3000
+ FileFactory.createDirectoryAndSetPermission(tmpPathV, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ CarbonUtil.agingTempFolderForIndexServer(age)
+ assert(!FileFactory.isFileExist(tmpPath))
+ System.out.println(folderPath)
+ assert(FileFactory.isFileExist(folderPath))
+ assert(FileFactory.isFileExist(tmpPathV))
+ }
+
+
+}