blob: b36f7ea8f09d162ddbde8af0634ffa63e54c7197 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.indexserver
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import mockit.{Mock, MockUp}
import org.apache.hadoop.conf.Configuration
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.scalatest.{BeforeAndAfterEach, FunSuite}
import org.apache.carbondata.core.index.{IndexInputFormat, Segment}
import org.apache.carbondata.core.index.dev.expr.IndexInputSplitWrapper
import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexInputSplit
import org.apache.carbondata.indexserver.{DistributedIndexJob, DistributedRDDUtils}
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
class DistributedRDDUtilsTest extends FunSuite with BeforeAndAfterEach {
val executorCache: ConcurrentHashMap[String, ConcurrentHashMap[String, Long]] = DistributedRDDUtils
.executorToCacheSizeMapping
val tableCache: ConcurrentHashMap[String, ConcurrentHashMap[String, String]] = DistributedRDDUtils.tableToExecutorMapping
val indexServerTempFolder = "file:////tmp/indexservertmp/"
override protected def beforeEach(): Unit = {
executorCache.clear()
tableCache.clear()
FileFactory.deleteFile(indexServerTempFolder)
buildTestData
}
def buildTestData {
val tableMap = new ConcurrentHashMap[String, String]
tableMap.put("0" , "IP1_EID1")
tableMap.put("1", "IP1_EID2")
tableCache.put("Table1", tableMap)
val executorMap1 = new ConcurrentHashMap[String, Long]
executorMap1.put("EID1", 1L)
executorMap1.put("EID2", 1L)
val executorMap2 = new ConcurrentHashMap[String, Long]
executorMap2.put("EID1", 1L)
executorMap2.put("EID2", 1L)
executorCache.put("IP1", executorMap1)
executorCache.put("IP2", executorMap2)
}
test("test server mappings when 1 host is dead") {
DistributedRDDUtils.invalidateHosts(Seq("IP1"))
assert(DistributedRDDUtils.executorToCacheSizeMapping.size() == 1)
assert(!DistributedRDDUtils.executorToCacheSizeMapping.containsKey("IP1"))
assert(DistributedRDDUtils.tableToExecutorMapping.get("Table1").size() == 2)
assert(!DistributedRDDUtils.tableToExecutorMapping.get("Table1").values().contains("IP1"))
}
test("test server mappings when all executor hosts are dead") {
DistributedRDDUtils.invalidateHosts(Seq("IP1", "IP2"))
assert(DistributedRDDUtils.executorToCacheSizeMapping.size() == 0)
assert(!DistributedRDDUtils.executorToCacheSizeMapping.containsKey("IP1"))
assert(!DistributedRDDUtils.executorToCacheSizeMapping.containsKey("IP2"))
// table cache may be present because even if the executor comes up it can handle further
// requests. If another executor is up then reassignment will happen.
}
test("test server mappings when 1 executor is dead") {
DistributedRDDUtils.invalidateExecutors(Seq("IP1_EID1"))
assert(DistributedRDDUtils.executorToCacheSizeMapping.size() == 2)
assert(DistributedRDDUtils.executorToCacheSizeMapping.containsKey("IP1"))
assert(!DistributedRDDUtils.executorToCacheSizeMapping.get("IP1").contains("EID1"))
assert(DistributedRDDUtils.tableToExecutorMapping.get("Table1").size() == 2)
assert(!DistributedRDDUtils.tableToExecutorMapping.get("Table1").get("0").equalsIgnoreCase("IP1_EID1"))
}
test("Test distribution for legacy segments") {
val executorList = (0 until 10).map {
host =>
val executorIds = (0 until 2).map {
executor => executor.toString
}
(host.toString, executorIds)
}.toMap
val indexDistributableWrapper = (0 to 5010).map {
i =>
val segment = new Segment(i.toString)
segment.setIndexSize(1)
val blockletIndexDistributable = new BlockletIndexInputSplit(i.toString)
blockletIndexDistributable.setSegment(segment)
new IndexInputSplitWrapper("", blockletIndexDistributable)
}
DistributedRDDUtils
.getExecutors(indexDistributableWrapper.toArray, executorList, "default_table1", 1)
DistributedRDDUtils.executorToCacheSizeMapping.asScala.foreach {
a => a._2.values().asScala.foreach(size => assert(size == 250 || size == 251))
}
}
test("Test distribution for non legacy segments") {
val executorList = (0 until 10).map {
host =>
val executorIds = (0 until 2).map {
executor => executor.toString
}
(host.toString, executorIds)
}.toMap
val indexDistributableWrapper = (0 to 5010).map {
i =>
val segment = new Segment(i.toString)
segment.setIndexSize(111)
val blockletIndexDistributable = new BlockletIndexInputSplit(i.toString)
blockletIndexDistributable.setSegment(segment)
new IndexInputSplitWrapper("", blockletIndexDistributable)
}
DistributedRDDUtils
.getExecutors(indexDistributableWrapper.toArray, executorList, "default_table1", 1)
DistributedRDDUtils.executorToCacheSizeMapping.asScala.foreach {
a => a._2.values().asScala.foreach(size => assert(size > 27500 && size < 28000))
}
}
test("Test file create and delete when query") {
val distributedRDDUtilsTest = new DistributedIndexJob()
val mockDataMapFormat = new MockUp[IndexInputFormat]() {
@Mock
def getQueryId: String = {
"a885a111-439f-4b91-ad81-f0bd48164b84"
}
}
try{
distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance, new Configuration())
} catch {
case ex: Exception =>
}
val tmpPath = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84"
assert(!FileFactory.isFileExist(tmpPath))
assert(FileFactory.isFileExist(indexServerTempFolder))
}
test("Test file create and delete when query the getQueryId path is exists") {
val distributedRDDUtilsTest = new DistributedIndexJob()
val tmpPath = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84"
val newPath = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84/ip1"
val newFile = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b84/ip1/as1"
val tmpPathAnother = "file:////tmp/indexservertmp/a885a111-439f-4b91-ad81-f0bd48164b8412"
FileFactory.createDirectoryAndSetPermission(tmpPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
FileFactory.createDirectoryAndSetPermission(newPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
FileFactory.createNewFile(newFile, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
FileFactory.createDirectoryAndSetPermission(tmpPathAnother, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
assert(FileFactory.isFileExist(newFile))
assert(FileFactory.isFileExist(tmpPath))
assert(FileFactory.isFileExist(newPath))
assert(FileFactory.isFileExist(tmpPathAnother))
val mockDataMapFormat = new MockUp[IndexInputFormat]() {
@Mock
def getQueryId: String = {
"a885a111-439f-4b91-ad81-f0bd48164b84"
}
}
try{
distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance, new Configuration())
} catch {
case ex: Exception =>
}
assert(!FileFactory.isFileExist(tmpPath))
assert(FileFactory.isFileExist(indexServerTempFolder))
assert(FileFactory.isFileExist(tmpPathAnother))
}
}