blob: 1984d6cb8045eb1492ea3547bb93ca9ad0958729 [file] [log] [blame]
package ittest.io.pivotal.gemfire.spark.connector.testkit
import java.io.{IOException, File}
import java.net.InetAddress
import java.util.Properties
import org.apache.commons.httpclient.HttpClient
import org.apache.commons.io.FileUtils
import org.apache.commons.io.filefilter.IOFileFilter
/**
* A class that manages GemFire locator and servers. Uses gfsh to
* start and stop the locator and servers.
*/
class GemFireRunner(settings: Properties) {
private def gemfireHome: String = scala.util.Properties.envOrNone("GEMFIRE") match {
case Some(home) => home
case None => throw new RuntimeException("No GEMFIRE environment variable set in environment.")
}
val gfshCmd = new File(gemfireHome, "bin/gfsh").toString
val cacheXMLFile = settings.get("cache-xml-file")
val numServers: Int = settings.get("num-of-servers").asInstanceOf[String].toInt
val cwd = new File(".").getAbsolutePath
val gemfireFunctionsTargetDir = new File("../gemfire-functions/target")
val testroot = "target/testgemfire"
val classpath = new File(cwd, "target/scala-2.10/it-classes/")
val locatorPort = startGemFireCluster(numServers)
def getLocatorPort: Int = locatorPort
private def startGemFireCluster(numServers: Int): Int = {
//ports(0) for GemFire locator, the other ports are for GemFire servers
val ports: Seq[Int] = IOUtils.getRandomAvailableTCPPorts(2 + numServers)
startGemFireLocator(ports(0), ports(1))
startGemFireServers(ports(0), ports.drop(2))
registerFunctions(ports(1))
ports(0)
}
private def startGemFireLocator(locatorPort: Int, jmxHttpPort:Int) {
println(s"=== GemFireRunner: starting locator on port $locatorPort")
val locatorDir = new File(cwd, s"$testroot/locator")
if (locatorDir.exists())
FileUtils.deleteDirectory(locatorDir)
IOUtils.mkdir(locatorDir)
new ProcessBuilder()
.command(gfshCmd, "start", "locator",
"--name=locator",
s"--dir=$locatorDir",
s"--port=$locatorPort",
s"--J=-Dgemfire.jmx-manager-http-port=$jmxHttpPort")
.inheritIO()
.start()
// Wait 30 seconds for locator to start
println(s"=== GemFireRunner: waiting for locator on port $locatorPort")
if (!IOUtils.waitForPortOpen(InetAddress.getByName("localhost"), locatorPort, 30000))
throw new IOException("Failed to start GemFire locator.")
println(s"=== GemFireRunner: done waiting for locator on port $locatorPort")
}
private def startGemFireServers(locatorPort: Int, serverPorts: Seq[Int]) {
val procs = for (i <- 0 until serverPorts.length) yield {
println(s"=== GemFireRunner: starting server${i+1} with clientPort ${serverPorts(i)}")
val serverDir = new File(cwd, s"$testroot/server${i+1}")
if (serverDir.exists())
FileUtils.deleteDirectory(serverDir)
IOUtils.mkdir(serverDir)
new ProcessBuilder()
.command(gfshCmd, "start", "server",
s"--name=server${i+1}",
s"--locators=localhost[$locatorPort]",
s"--bind-address=localhost",
s"--server-port=${serverPorts(i)}",
s"--dir=$serverDir",
s"--cache-xml-file=$cacheXMLFile",
s"--classpath=$classpath")
.inheritIO()
.start()
}
procs.foreach(p => p.waitFor)
println(s"All $serverPorts.length servers have been started")
}
private def registerFunctions(jmxHttpPort:Int) {
import scala.collection.JavaConversions._
FileUtils.listFiles(gemfireFunctionsTargetDir, fileFilter, dirFilter).foreach{ f => registerFunction(jmxHttpPort, f)}
}
def fileFilter = new IOFileFilter {
def accept (file: File) = file.getName.endsWith(".jar") && file.getName.startsWith("gemfire-functions")
def accept (dir: File, name: String) = name.endsWith(".jar") && name.startsWith("gemfire-functions")
}
def dirFilter = new IOFileFilter {
def accept (file: File) = file.getName.startsWith("scala")
def accept (dir: File, name: String) = name.startsWith("scala")
}
private def registerFunction(jmxHttpPort:Int, jar:File) {
println("Deploying:" + jar.getName)
import io.pivotal.gemfire.spark.connector.GemFireFunctionDeployer
val deployer = new GemFireFunctionDeployer(new HttpClient())
deployer.deploy("localhost", jmxHttpPort, jar)
}
def stopGemFireCluster(): Unit = {
stopGemFireServers(numServers)
stopGemFireLocator()
if (!IOUtils.waitForPortClose(InetAddress.getByName("localhost"), getLocatorPort, 30000))
throw new IOException(s"Failed to stop GemFire locator at port $getLocatorPort.")
println(s"Successfully stop GemFire locator at port $getLocatorPort.")
}
private def stopGemFireLocator() {
println(s"=== GemFireRunner: stop locator")
val p = new ProcessBuilder()
.inheritIO()
.command(gfshCmd, "stop", "locator", s"--dir=$testroot/locator")
.start()
p.waitFor()
}
private def stopGemFireServers(numServers: Int) {
val procs = for (i <-1 to numServers) yield {
println(s"=== GemFireRunner: stop server $i.")
new ProcessBuilder()
.inheritIO()
.command(gfshCmd, "stop", "server", s"--dir=$testroot/server$i")
.start()
}
procs.foreach(p => p.waitFor())
}
}