blob: 54a5de0973581434bda7c365e18694e44998ade2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.metrics
import org.apache.samza.util.{Util, Logging}
import java.rmi.server.RMIServerSocketFactory
import java.net.ServerSocket
import java.rmi.registry.LocateRegistry
import management.ManagementFactory
import java.util
import javax.management.remote.{ JMXConnectorServerFactory, JMXServiceURL }
/**
* Programmatically start the JMX server and its accompanying RMI server. This is necessary in order to reliably
* and - heh - simply request and know a dynamic port such that processes on the same machine do not collide when
* opening JMX servers on the same port. Server will start upon instantiation.
*
* Note: This server starts the JMX server, which runs in a separate thread and must be stopped or it will prevent
* the process from ending.
*
* @param requestedPort Port on which to start JMX server, 0 for ephemeral
*/
class JmxServer(requestedPort: Int) extends Logging {
val hostname = Util.getLocalHost.getHostName
def this() = this(0)
// Instance construction
val (jmxServer, url, registryPort, serverPort) = {
// An RMIServerSocketFactory that will tell what port it opened up. Imagine that.
class UpfrontRMIServerSocketFactory extends RMIServerSocketFactory {
var lastSS: ServerSocket = null
def createServerSocket(port: Int): ServerSocket = {
lastSS = new ServerSocket(port)
lastSS
}
}
// Check if the system property has been set and, if not, set it to what we need. Warn otherwise.
def updateSystemProperty(prop: String, value: String) = {
val existingProp = System.getProperty(prop)
if (existingProp == null) {
debug("Setting new system property of %s to %s" format (prop, value))
System.setProperty(prop, value)
} else {
info("Not overriding system property %s as already has value %s" format (prop, existingProp))
}
}
if (System.getProperty("com.sun.management.jmxremote") != null) {
warn("System property com.sun.management.jmxremote has been specified, starting the JVM's JMX server as well. " +
"This behavior is not well defined and our values will collide with any set on command line.")
}
info("According to Util.getLocalHost.getHostName we are " + hostname)
updateSystemProperty("com.sun.management.jmxremote.authenticate", "false")
updateSystemProperty("com.sun.management.jmxremote.ssl", "false")
updateSystemProperty("java.rmi.server.hostname", hostname)
// There's still a race condition in which this port is released and taken by
// another process before it is used for our server, but that will be even more
// rare than the collisions we experienced when incrementing the registry port.
def getEphemeralPort = {
val serverSocket = new ServerSocket(0)
try {
serverSocket.getLocalPort
} finally {
serverSocket.close()
}
}
val ssFactory = new UpfrontRMIServerSocketFactory
LocateRegistry.createRegistry(requestedPort, null, ssFactory)
val registryPort = ssFactory.lastSS.getLocalPort
val serverPort = getEphemeralPort
val mbs = ManagementFactory.getPlatformMBeanServer
val env = new util.HashMap[String, Object]()
val url = new JMXServiceURL("service:jmx:rmi://localhost:" + serverPort + "/jndi/rmi://localhost:" + registryPort + "/jmxrmi")
val jmxServer = JMXConnectorServerFactory.newJMXConnectorServer(url, env, mbs)
(jmxServer, url.toString, registryPort, serverPort)
}
jmxServer.start
startupLog("Started " + toString)
startupLog("If you are tunneling, you might want to try " + toString.replaceAll("localhost", hostname))
println(s"JMX Server: ${toString.replaceAll("localhost", hostname)}")
/**
* Get RMI registry port
* @return RMI port
*/
def getRegistryPort = registryPort
/**
* Get JMX server port
*/
def getServerPort = serverPort
/**
* Get Jmx URL for this server
* @return Jmx-Style URL string
*/
def getJmxUrl = url
/**
* Stop the JMX server. Must be called at program end or will prevent termination.
*/
def stop = jmxServer.stop
override def toString = "JmxServer registry port=%d server port=%d url=%s" format (getRegistryPort, getServerPort, getJmxUrl)
def getTunnelingJmxUrl = getJmxUrl.replaceAll("localhost", hostname)
}